source: code/trunk/downstream.go@ 63

Last change on this file since 63 was 63, checked in by contact, 5 years ago

Fix panic when closing downstream connection

File size: 8.8 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
[57]43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
[22]50
[13]51 registered bool
[37]52 user *user
[13]53 nick string
54 username string
55 realname string
56}
57
[22]58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]59 dc := &downstreamConn{
[57]60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
[22]67 }
[26]68
69 go func() {
[56]70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
[26]72 }
[55]73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
[45]75 } else {
[55]76 dc.logger.Printf("connection closed")
[45]77 }
[26]78 }()
79
[55]80 return dc
[22]81}
82
[55]83func (dc *downstreamConn) prefix() *irc.Prefix {
[27]84 return &irc.Prefix{
[55]85 Name: dc.nick,
86 User: dc.username,
[27]87 // TODO: fill the host?
88 }
89}
90
[57]91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
[55]100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
[22]102
103 for {
[55]104 msg, err := dc.irc.ReadMessage()
[22]105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
[55]111 err = dc.handleMessage(msg)
[22]112 if ircErr, ok := err.(ircError); ok {
[55]113 ircErr.Message.Prefix = dc.srv.prefix()
114 dc.SendMessage(ircErr.Message)
[22]115 } else if err != nil {
116 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
117 }
118
[57]119 if dc.isClosed() {
[22]120 return nil
121 }
122 }
123
[45]124 return nil
[22]125}
126
[56]127func (dc *downstreamConn) writeMessages() error {
[57]128 for {
129 var err error
130 var closed bool
131 select {
132 case msg := <-dc.messages:
133 err = dc.irc.WriteMessage(msg)
134 case consumer := <-dc.consumers:
135 for {
136 msg := consumer.Peek()
137 if msg == nil {
138 break
139 }
140 err = dc.irc.WriteMessage(msg)
141 if err != nil {
142 break
143 }
144 consumer.Consume()
145 }
146 case <-dc.closed:
147 closed = true
148 }
149 if err != nil {
[56]150 return err
151 }
[57]152 if closed {
153 break
154 }
[56]155 }
156 return nil
157}
158
[55]159func (dc *downstreamConn) Close() error {
[57]160 if dc.isClosed() {
[26]161 return fmt.Errorf("downstream connection already closed")
162 }
[40]163
[55]164 if u := dc.user; u != nil {
[40]165 u.lock.Lock()
166 for i := range u.downstreamConns {
[55]167 if u.downstreamConns[i] == dc {
[40]168 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]169 break
[40]170 }
171 }
172 u.lock.Unlock()
[13]173 }
[40]174
[57]175 close(dc.closed)
[45]176 return nil
[13]177}
178
[55]179func (dc *downstreamConn) SendMessage(msg *irc.Message) {
180 dc.messages <- msg
[54]181}
182
[55]183func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]184 switch msg.Command {
[28]185 case "QUIT":
[55]186 return dc.Close()
[13]187 case "PING":
188 // TODO: handle params
[55]189 dc.SendMessage(&irc.Message{
190 Prefix: dc.srv.prefix(),
[13]191 Command: "PONG",
[55]192 Params: []string{dc.srv.Hostname},
[54]193 })
[26]194 return nil
[13]195 default:
[55]196 if dc.registered {
197 return dc.handleMessageRegistered(msg)
[13]198 } else {
[55]199 return dc.handleMessageUnregistered(msg)
[13]200 }
201 }
202}
203
[55]204func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]205 switch msg.Command {
206 case "NICK":
[55]207 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]208 return err
[13]209 }
210 case "USER":
[43]211 var username string
[55]212 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]213 return err
[13]214 }
[55]215 dc.username = "~" + username
[13]216 default:
[55]217 dc.logger.Printf("unhandled message: %v", msg)
[13]218 return newUnknownCommandError(msg.Command)
219 }
[55]220 if dc.username != "" && dc.nick != "" {
221 return dc.register()
[13]222 }
223 return nil
224}
225
[55]226func (dc *downstreamConn) register() error {
227 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
[38]228 if u == nil {
[55]229 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
230 dc.SendMessage(&irc.Message{
231 Prefix: dc.srv.prefix(),
[37]232 Command: irc.ERR_PASSWDMISMATCH,
233 Params: []string{"*", "Invalid username or password"},
[54]234 })
[37]235 return nil
236 }
237
[55]238 dc.registered = true
239 dc.user = u
[13]240
[40]241 u.lock.Lock()
[57]242 firstDownstream := len(u.downstreamConns) == 0
[55]243 u.downstreamConns = append(u.downstreamConns, dc)
[40]244 u.lock.Unlock()
245
[55]246 dc.SendMessage(&irc.Message{
247 Prefix: dc.srv.prefix(),
[13]248 Command: irc.RPL_WELCOME,
[55]249 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]250 })
[55]251 dc.SendMessage(&irc.Message{
252 Prefix: dc.srv.prefix(),
[13]253 Command: irc.RPL_YOURHOST,
[55]254 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]255 })
[55]256 dc.SendMessage(&irc.Message{
257 Prefix: dc.srv.prefix(),
[13]258 Command: irc.RPL_CREATED,
[55]259 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]260 })
[55]261 dc.SendMessage(&irc.Message{
262 Prefix: dc.srv.prefix(),
[13]263 Command: irc.RPL_MYINFO,
[55]264 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]265 })
[55]266 dc.SendMessage(&irc.Message{
267 Prefix: dc.srv.prefix(),
[13]268 Command: irc.ERR_NOMOTD,
[55]269 Params: []string{dc.nick, "No MOTD"},
[54]270 })
[13]271
[39]272 u.forEachUpstream(func(uc *upstreamConn) {
[30]273 // TODO: fix races accessing upstream connection data
274 for _, ch := range uc.channels {
275 if ch.complete {
[55]276 forwardChannel(dc, ch)
[30]277 }
278 }
[50]279
[51]280 // TODO: let clients specify the ring buffer name in their username
[57]281 historyName := ""
282
283 var seqPtr *uint64
284 if firstDownstream {
285 seq, ok := uc.history[historyName]
286 if ok {
287 seqPtr = &seq
[50]288 }
289 }
[57]290
[59]291 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]292 go func() {
293 for {
294 var closed bool
295 select {
296 case <-ch:
297 dc.consumers <- consumer
298 case <-dc.closed:
299 closed = true
300 }
301 if closed {
302 break
303 }
304 }
305
306 seq := consumer.Close()
307
308 dc.user.lock.Lock()
309 lastDownstream := len(dc.user.downstreamConns) == 0
310 dc.user.lock.Unlock()
311
312 if lastDownstream {
313 uc.history[historyName] = seq
314 }
315 }()
[39]316 })
[50]317
[13]318 return nil
319}
320
[55]321func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]322 switch msg.Command {
[42]323 case "USER":
[13]324 return ircError{&irc.Message{
325 Command: irc.ERR_ALREADYREGISTERED,
[55]326 Params: []string{dc.nick, "You may not reregister"},
[13]327 }}
[42]328 case "NICK":
[55]329 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]330 uc.SendMessage(msg)
[42]331 })
[48]332 case "JOIN":
333 var name string
334 if err := parseMessageParams(msg, &name); err != nil {
335 return err
336 }
337
[55]338 if ch, _ := dc.user.getChannel(name); ch != nil {
[48]339 break // already joined
340 }
341
342 // TODO: extract network name from channel name
343 return ircError{&irc.Message{
344 Command: irc.ERR_NOSUCHCHANNEL,
345 Params: []string{name, "Channel name ambiguous"},
346 }}
[49]347 case "PART":
348 var name string
349 if err := parseMessageParams(msg, &name); err != nil {
350 return err
351 }
352
[55]353 ch, err := dc.user.getChannel(name)
[49]354 if err != nil {
355 return err
356 }
357
[60]358 ch.conn.SendMessage(msg)
[49]359 // TODO: remove channel from upstream config
[46]360 case "MODE":
361 var name string
362 if err := parseMessageParams(msg, &name); err != nil {
363 return err
364 }
365
366 var modeStr string
367 if len(msg.Params) > 1 {
368 modeStr = msg.Params[1]
369 }
370
371 if msg.Prefix.Name != name {
[55]372 ch, err := dc.user.getChannel(name)
[46]373 if err != nil {
374 return err
375 }
376
377 if modeStr != "" {
[60]378 ch.conn.SendMessage(msg)
[46]379 } else {
[55]380 dc.SendMessage(&irc.Message{
381 Prefix: dc.srv.prefix(),
[46]382 Command: irc.RPL_CHANNELMODEIS,
383 Params: []string{ch.Name, string(ch.modes)},
[54]384 })
[46]385 }
386 } else {
[55]387 if name != dc.nick {
[46]388 return ircError{&irc.Message{
389 Command: irc.ERR_USERSDONTMATCH,
[55]390 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]391 }}
392 }
393
394 if modeStr != "" {
[55]395 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]396 uc.SendMessage(msg)
[46]397 })
398 } else {
[55]399 dc.SendMessage(&irc.Message{
400 Prefix: dc.srv.prefix(),
[46]401 Command: irc.RPL_UMODEIS,
402 Params: []string{""}, // TODO
[54]403 })
[46]404 }
405 }
[58]406 case "PRIVMSG":
407 var targetsStr, text string
408 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
409 return err
410 }
411
412 for _, name := range strings.Split(targetsStr, ",") {
413 ch, err := dc.user.getChannel(name)
414 if err != nil {
415 return err
416 }
417
[60]418 ch.conn.SendMessage(&irc.Message{
[58]419 Prefix: msg.Prefix,
420 Command: "PRIVMSG",
421 Params: []string{name, text},
[60]422 })
[58]423 }
[13]424 default:
[55]425 dc.logger.Printf("unhandled message: %v", msg)
[13]426 return newUnknownCommandError(msg.Command)
427 }
[42]428 return nil
[13]429}
Note: See TracBrowser for help on using the repository browser.