source: code/trunk/downstream.go@ 67

Last change on this file since 67 was 66, checked in by contact, 5 years ago

Properly handle PING messages

File size: 9.3 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
[57]43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
[22]50
[13]51 registered bool
[37]52 user *user
[13]53 nick string
54 username string
55 realname string
56}
57
[22]58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]59 dc := &downstreamConn{
[57]60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
[22]67 }
[26]68
69 go func() {
[56]70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
[26]72 }
[55]73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
[45]75 } else {
[55]76 dc.logger.Printf("connection closed")
[45]77 }
[26]78 }()
79
[55]80 return dc
[22]81}
82
[55]83func (dc *downstreamConn) prefix() *irc.Prefix {
[27]84 return &irc.Prefix{
[55]85 Name: dc.nick,
86 User: dc.username,
[27]87 // TODO: fill the host?
88 }
89}
90
[57]91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
[55]100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
[22]102
103 for {
[55]104 msg, err := dc.irc.ReadMessage()
[22]105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
[64]111 if dc.srv.Debug {
112 dc.logger.Printf("received: %v", msg)
113 }
114
[55]115 err = dc.handleMessage(msg)
[22]116 if ircErr, ok := err.(ircError); ok {
[55]117 ircErr.Message.Prefix = dc.srv.prefix()
118 dc.SendMessage(ircErr.Message)
[22]119 } else if err != nil {
120 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
121 }
122
[57]123 if dc.isClosed() {
[22]124 return nil
125 }
126 }
127
[45]128 return nil
[22]129}
130
[56]131func (dc *downstreamConn) writeMessages() error {
[57]132 for {
133 var err error
134 var closed bool
135 select {
136 case msg := <-dc.messages:
[64]137 if dc.srv.Debug {
138 dc.logger.Printf("sent: %v", msg)
139 }
[57]140 err = dc.irc.WriteMessage(msg)
141 case consumer := <-dc.consumers:
142 for {
143 msg := consumer.Peek()
144 if msg == nil {
145 break
146 }
[64]147 if dc.srv.Debug {
148 dc.logger.Printf("sent: %v", msg)
149 }
[57]150 err = dc.irc.WriteMessage(msg)
151 if err != nil {
152 break
153 }
154 consumer.Consume()
155 }
156 case <-dc.closed:
157 closed = true
158 }
159 if err != nil {
[56]160 return err
161 }
[57]162 if closed {
163 break
164 }
[56]165 }
166 return nil
167}
168
[55]169func (dc *downstreamConn) Close() error {
[57]170 if dc.isClosed() {
[26]171 return fmt.Errorf("downstream connection already closed")
172 }
[40]173
[55]174 if u := dc.user; u != nil {
[40]175 u.lock.Lock()
176 for i := range u.downstreamConns {
[55]177 if u.downstreamConns[i] == dc {
[40]178 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]179 break
[40]180 }
181 }
182 u.lock.Unlock()
[13]183 }
[40]184
[57]185 close(dc.closed)
[45]186 return nil
[13]187}
188
[55]189func (dc *downstreamConn) SendMessage(msg *irc.Message) {
190 dc.messages <- msg
[54]191}
192
[55]193func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]194 switch msg.Command {
[28]195 case "QUIT":
[55]196 return dc.Close()
[13]197 case "PING":
[66]198 var from, to string
199 if len(msg.Params) >= 1 {
200 from = msg.Params[0]
201 }
202 if len(msg.Params) >= 2 {
203 to = msg.Params[1]
204 }
205
206 if to != "" && to != dc.srv.Hostname {
207 return ircError{&irc.Message{
208 Command: irc.ERR_NOSUCHSERVER,
209 Params: []string{to, "No such server"},
210 }}
211 }
212
213 params := []string{dc.srv.Hostname}
214 if from != "" {
215 params = append(params, from)
216 }
[55]217 dc.SendMessage(&irc.Message{
218 Prefix: dc.srv.prefix(),
[13]219 Command: "PONG",
[66]220 Params: params,
[54]221 })
[26]222 return nil
[13]223 default:
[55]224 if dc.registered {
225 return dc.handleMessageRegistered(msg)
[13]226 } else {
[55]227 return dc.handleMessageUnregistered(msg)
[13]228 }
229 }
230}
231
[55]232func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]233 switch msg.Command {
234 case "NICK":
[55]235 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]236 return err
[13]237 }
238 case "USER":
[43]239 var username string
[55]240 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]241 return err
[13]242 }
[55]243 dc.username = "~" + username
[13]244 default:
[55]245 dc.logger.Printf("unhandled message: %v", msg)
[13]246 return newUnknownCommandError(msg.Command)
247 }
[55]248 if dc.username != "" && dc.nick != "" {
249 return dc.register()
[13]250 }
251 return nil
252}
253
[55]254func (dc *downstreamConn) register() error {
255 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
[38]256 if u == nil {
[55]257 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
258 dc.SendMessage(&irc.Message{
259 Prefix: dc.srv.prefix(),
[37]260 Command: irc.ERR_PASSWDMISMATCH,
261 Params: []string{"*", "Invalid username or password"},
[54]262 })
[37]263 return nil
264 }
265
[55]266 dc.registered = true
267 dc.user = u
[13]268
[40]269 u.lock.Lock()
[57]270 firstDownstream := len(u.downstreamConns) == 0
[55]271 u.downstreamConns = append(u.downstreamConns, dc)
[40]272 u.lock.Unlock()
273
[55]274 dc.SendMessage(&irc.Message{
275 Prefix: dc.srv.prefix(),
[13]276 Command: irc.RPL_WELCOME,
[55]277 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]278 })
[55]279 dc.SendMessage(&irc.Message{
280 Prefix: dc.srv.prefix(),
[13]281 Command: irc.RPL_YOURHOST,
[55]282 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]283 })
[55]284 dc.SendMessage(&irc.Message{
285 Prefix: dc.srv.prefix(),
[13]286 Command: irc.RPL_CREATED,
[55]287 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]288 })
[55]289 dc.SendMessage(&irc.Message{
290 Prefix: dc.srv.prefix(),
[13]291 Command: irc.RPL_MYINFO,
[55]292 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]293 })
[55]294 dc.SendMessage(&irc.Message{
295 Prefix: dc.srv.prefix(),
[13]296 Command: irc.ERR_NOMOTD,
[55]297 Params: []string{dc.nick, "No MOTD"},
[54]298 })
[13]299
[39]300 u.forEachUpstream(func(uc *upstreamConn) {
[30]301 // TODO: fix races accessing upstream connection data
302 for _, ch := range uc.channels {
303 if ch.complete {
[55]304 forwardChannel(dc, ch)
[30]305 }
306 }
[50]307
[51]308 // TODO: let clients specify the ring buffer name in their username
[57]309 historyName := ""
310
311 var seqPtr *uint64
312 if firstDownstream {
313 seq, ok := uc.history[historyName]
314 if ok {
315 seqPtr = &seq
[50]316 }
317 }
[57]318
[59]319 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]320 go func() {
321 for {
322 var closed bool
323 select {
324 case <-ch:
325 dc.consumers <- consumer
326 case <-dc.closed:
327 closed = true
328 }
329 if closed {
330 break
331 }
332 }
333
334 seq := consumer.Close()
335
336 dc.user.lock.Lock()
337 lastDownstream := len(dc.user.downstreamConns) == 0
338 dc.user.lock.Unlock()
339
340 if lastDownstream {
341 uc.history[historyName] = seq
342 }
343 }()
[39]344 })
[50]345
[13]346 return nil
347}
348
[55]349func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]350 switch msg.Command {
[42]351 case "USER":
[13]352 return ircError{&irc.Message{
353 Command: irc.ERR_ALREADYREGISTERED,
[55]354 Params: []string{dc.nick, "You may not reregister"},
[13]355 }}
[42]356 case "NICK":
[55]357 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]358 uc.SendMessage(msg)
[42]359 })
[48]360 case "JOIN":
361 var name string
362 if err := parseMessageParams(msg, &name); err != nil {
363 return err
364 }
365
[55]366 if ch, _ := dc.user.getChannel(name); ch != nil {
[48]367 break // already joined
368 }
369
370 // TODO: extract network name from channel name
371 return ircError{&irc.Message{
372 Command: irc.ERR_NOSUCHCHANNEL,
373 Params: []string{name, "Channel name ambiguous"},
374 }}
[49]375 case "PART":
376 var name string
377 if err := parseMessageParams(msg, &name); err != nil {
378 return err
379 }
380
[55]381 ch, err := dc.user.getChannel(name)
[49]382 if err != nil {
383 return err
384 }
385
[60]386 ch.conn.SendMessage(msg)
[49]387 // TODO: remove channel from upstream config
[46]388 case "MODE":
389 var name string
390 if err := parseMessageParams(msg, &name); err != nil {
391 return err
392 }
393
394 var modeStr string
395 if len(msg.Params) > 1 {
396 modeStr = msg.Params[1]
397 }
398
399 if msg.Prefix.Name != name {
[55]400 ch, err := dc.user.getChannel(name)
[46]401 if err != nil {
402 return err
403 }
404
405 if modeStr != "" {
[60]406 ch.conn.SendMessage(msg)
[46]407 } else {
[55]408 dc.SendMessage(&irc.Message{
409 Prefix: dc.srv.prefix(),
[46]410 Command: irc.RPL_CHANNELMODEIS,
411 Params: []string{ch.Name, string(ch.modes)},
[54]412 })
[46]413 }
414 } else {
[55]415 if name != dc.nick {
[46]416 return ircError{&irc.Message{
417 Command: irc.ERR_USERSDONTMATCH,
[55]418 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]419 }}
420 }
421
422 if modeStr != "" {
[55]423 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]424 uc.SendMessage(msg)
[46]425 })
426 } else {
[55]427 dc.SendMessage(&irc.Message{
428 Prefix: dc.srv.prefix(),
[46]429 Command: irc.RPL_UMODEIS,
430 Params: []string{""}, // TODO
[54]431 })
[46]432 }
433 }
[58]434 case "PRIVMSG":
435 var targetsStr, text string
436 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
437 return err
438 }
439
440 for _, name := range strings.Split(targetsStr, ",") {
441 ch, err := dc.user.getChannel(name)
442 if err != nil {
443 return err
444 }
445
[60]446 ch.conn.SendMessage(&irc.Message{
[58]447 Prefix: msg.Prefix,
448 Command: "PRIVMSG",
449 Params: []string{name, text},
[60]450 })
[58]451 }
[13]452 default:
[55]453 dc.logger.Printf("unhandled message: %v", msg)
[13]454 return newUnknownCommandError(msg.Command)
455 }
[42]456 return nil
[13]457}
Note: See TracBrowser for help on using the repository browser.