source: code/trunk/downstream.go@ 177

Last change on this file since 177 was 177, checked in by delthas, 5 years ago

Add LIST support

This commit adds support for downstream LIST messages from multiple
concurrent downstreams to multiple concurrent upstreams, including
support for multiple pending LIST requests from the same downstream.

Because a unique RPL_LISTEND message must be sent to the requesting
downstream, and that there might be multiple upstreams, each sending
their own RPL_LISTEND, a cache of RPL_LISTEND replies of some sort is
required to match RPL_LISTEND together in order to only send one back
downstream.

This commit adds a list of "pending LIST" structs, which each contain a
map of all upstreams that yet need to send a RPL_LISTEND, and the
corresponding LIST request associated with that response. This list of
pending LISTs is sorted according to the order that the requesting
downstreams sent the LIST messages in. Each pending set also stores the
id of the requesting downstream, in order to only forward the replies to
it and no other downstream. (This is important because LIST replies can
typically amount to several thousands messages on large servers.)

When a single downstream makes multiple LIST requests, only the first
one will be immediately sent to the upstream servers. The next ones will
be buffered until the first one is completed. Distinct downstreams can
make concurrent LIST requests without any request buffering.

Each RPL_LIST message is forwarded to the downstream of the first
matching pending LIST struct.

When an upstream sends an RPL_LISTEND message, the upstream is removed
from the first matching pending LIST struct, but that message is not
immediately forwarded downstream. If there are no remaining pending LIST
requests in that struct is then empty, that means all upstreams have
sent back all their RPL_LISTEND replies (which means they also sent all
their RPL_LIST replies); so a unique RPL_LISTEND is sent to downstream
and that pending LIST set is removed from the cache.

Upstreams are removed from the pending LIST structs in two other cases:

  • when they are closed (to avoid stalling because of a disconnected

upstream that will never reply to the LIST message): they are removed
from all pending LIST structs

  • when they reply with an ERR_UNKNOWNCOMMAND or RPL_TRYAGAIN LIST reply,

which is typically used when a user is not allowed to LIST because they
just joined the server: they are removed from the first pending LIST
struct, as if an RPL_LISTEND message was received

File size: 31.2 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "fmt"
7 "io"
8 "net"
9 "strconv"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "golang.org/x/crypto/bcrypt"
16 "gopkg.in/irc.v3"
17)
18
19type ircError struct {
20 Message *irc.Message
21}
22
23func (err ircError) Error() string {
24 return err.Message.String()
25}
26
27func newUnknownCommandError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_UNKNOWNCOMMAND,
30 Params: []string{
31 "*",
32 cmd,
33 "Unknown command",
34 },
35 }}
36}
37
38func newNeedMoreParamsError(cmd string) ircError {
39 return ircError{&irc.Message{
40 Command: irc.ERR_NEEDMOREPARAMS,
41 Params: []string{
42 "*",
43 cmd,
44 "Not enough parameters",
45 },
46 }}
47}
48
49var errAuthFailed = ircError{&irc.Message{
50 Command: irc.ERR_PASSWDMISMATCH,
51 Params: []string{"*", "Invalid username or password"},
52}}
53
54type ringMessage struct {
55 consumer *RingConsumer
56 upstreamConn *upstreamConn
57}
58
59type downstreamConn struct {
60 id uint64
61 net net.Conn
62 irc *irc.Conn
63 srv *Server
64 logger Logger
65 outgoing chan *irc.Message
66 ringMessages chan ringMessage
67 closed chan struct{}
68
69 registered bool
70 user *user
71 nick string
72 username string
73 rawUsername string
74 networkName string
75 realname string
76 hostname string
77 password string // empty after authentication
78 network *network // can be nil
79
80 negociatingCaps bool
81 capVersion int
82 caps map[string]bool
83
84 saslServer sasl.Server
85
86 lock sync.Mutex
87 ourMessages map[*irc.Message]struct{}
88}
89
90func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
91 dc := &downstreamConn{
92 id: id,
93 net: netConn,
94 irc: irc.NewConn(netConn),
95 srv: srv,
96 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
97 outgoing: make(chan *irc.Message, 64),
98 ringMessages: make(chan ringMessage),
99 closed: make(chan struct{}),
100 caps: make(map[string]bool),
101 ourMessages: make(map[*irc.Message]struct{}),
102 }
103 dc.hostname = netConn.RemoteAddr().String()
104 if host, _, err := net.SplitHostPort(dc.hostname); err == nil {
105 dc.hostname = host
106 }
107
108 go func() {
109 if err := dc.writeMessages(); err != nil {
110 dc.logger.Printf("failed to write message: %v", err)
111 }
112 if err := dc.net.Close(); err != nil {
113 dc.logger.Printf("failed to close connection: %v", err)
114 } else {
115 dc.logger.Printf("connection closed")
116 }
117 }()
118
119 dc.logger.Printf("new connection")
120 return dc
121}
122
123func (dc *downstreamConn) prefix() *irc.Prefix {
124 return &irc.Prefix{
125 Name: dc.nick,
126 User: dc.username,
127 Host: dc.hostname,
128 }
129}
130
131func (dc *downstreamConn) forEachNetwork(f func(*network)) {
132 if dc.network != nil {
133 f(dc.network)
134 } else {
135 dc.user.forEachNetwork(f)
136 }
137}
138
139func (dc *downstreamConn) forEachUpstream(f func(*upstreamConn)) {
140 dc.user.forEachUpstream(func(uc *upstreamConn) {
141 if dc.network != nil && uc.network != dc.network {
142 return
143 }
144 f(uc)
145 })
146}
147
148// upstream returns the upstream connection, if any. If there are zero or if
149// there are multiple upstream connections, it returns nil.
150func (dc *downstreamConn) upstream() *upstreamConn {
151 if dc.network == nil {
152 return nil
153 }
154 return dc.network.upstream()
155}
156
157func (dc *downstreamConn) marshalEntity(uc *upstreamConn, entity string) string {
158 if uc.isChannel(entity) {
159 return dc.marshalChannel(uc, entity)
160 }
161 return dc.marshalNick(uc, entity)
162}
163
164func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
165 if dc.network != nil {
166 return name
167 }
168 return name + "/" + uc.network.GetName()
169}
170
171func (dc *downstreamConn) unmarshalEntity(name string) (*upstreamConn, string, error) {
172 if uc := dc.upstream(); uc != nil {
173 return uc, name, nil
174 }
175
176 var conn *upstreamConn
177 if i := strings.LastIndexByte(name, '/'); i >= 0 {
178 network := name[i+1:]
179 name = name[:i]
180
181 dc.forEachUpstream(func(uc *upstreamConn) {
182 if network != uc.network.GetName() {
183 return
184 }
185 conn = uc
186 })
187 }
188
189 if conn == nil {
190 return nil, "", ircError{&irc.Message{
191 Command: irc.ERR_NOSUCHCHANNEL,
192 Params: []string{name, "No such channel"},
193 }}
194 }
195 return conn, name, nil
196}
197
198func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
199 if nick == uc.nick {
200 return dc.nick
201 }
202 if dc.network != nil {
203 return nick
204 }
205 return nick + "/" + uc.network.GetName()
206}
207
208func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
209 if prefix.Name == uc.nick {
210 return dc.prefix()
211 }
212 if dc.network != nil {
213 return prefix
214 }
215 return &irc.Prefix{
216 Name: prefix.Name + "/" + uc.network.GetName(),
217 User: prefix.User,
218 Host: prefix.Host,
219 }
220}
221
222func (dc *downstreamConn) isClosed() bool {
223 select {
224 case <-dc.closed:
225 return true
226 default:
227 return false
228 }
229}
230
231func (dc *downstreamConn) readMessages(ch chan<- event) error {
232 for {
233 msg, err := dc.irc.ReadMessage()
234 if err == io.EOF {
235 break
236 } else if err != nil {
237 return fmt.Errorf("failed to read IRC command: %v", err)
238 }
239
240 if dc.srv.Debug {
241 dc.logger.Printf("received: %v", msg)
242 }
243
244 ch <- eventDownstreamMessage{msg, dc}
245 }
246
247 return nil
248}
249
250func (dc *downstreamConn) writeMessages() error {
251 for {
252 var err error
253 var closed bool
254 select {
255 case msg := <-dc.outgoing:
256 if dc.srv.Debug {
257 dc.logger.Printf("sent: %v", msg)
258 }
259 err = dc.irc.WriteMessage(msg)
260 case ringMessage := <-dc.ringMessages:
261 consumer, uc := ringMessage.consumer, ringMessage.upstreamConn
262 for {
263 msg := consumer.Peek()
264 if msg == nil {
265 break
266 }
267
268 dc.lock.Lock()
269 _, ours := dc.ourMessages[msg]
270 delete(dc.ourMessages, msg)
271 dc.lock.Unlock()
272 if ours {
273 // The message comes from our connection, don't echo it
274 // back
275 consumer.Consume()
276 continue
277 }
278
279 msg = msg.Copy()
280 switch msg.Command {
281 case "PRIVMSG":
282 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
283 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
284 default:
285 panic("expected to consume a PRIVMSG message")
286 }
287 if dc.srv.Debug {
288 dc.logger.Printf("sent: %v", msg)
289 }
290 err = dc.irc.WriteMessage(msg)
291 if err != nil {
292 break
293 }
294 consumer.Consume()
295 }
296 case <-dc.closed:
297 closed = true
298 }
299 if err != nil {
300 return err
301 }
302 if closed {
303 break
304 }
305 }
306 return nil
307}
308
309func (dc *downstreamConn) Close() error {
310 if dc.isClosed() {
311 return fmt.Errorf("downstream connection already closed")
312 }
313
314 close(dc.closed)
315 return nil
316}
317
318func (dc *downstreamConn) SendMessage(msg *irc.Message) {
319 dc.outgoing <- msg
320}
321
322func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
323 switch msg.Command {
324 case "QUIT":
325 return dc.Close()
326 default:
327 if dc.registered {
328 return dc.handleMessageRegistered(msg)
329 } else {
330 return dc.handleMessageUnregistered(msg)
331 }
332 }
333}
334
335func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
336 switch msg.Command {
337 case "NICK":
338 var nick string
339 if err := parseMessageParams(msg, &nick); err != nil {
340 return err
341 }
342 if nick == serviceNick {
343 return ircError{&irc.Message{
344 Command: irc.ERR_NICKNAMEINUSE,
345 Params: []string{dc.nick, nick, "Nickname reserved for bouncer service"},
346 }}
347 }
348 dc.nick = nick
349 case "USER":
350 if err := parseMessageParams(msg, &dc.rawUsername, nil, nil, &dc.realname); err != nil {
351 return err
352 }
353 case "PASS":
354 if err := parseMessageParams(msg, &dc.password); err != nil {
355 return err
356 }
357 case "CAP":
358 var subCmd string
359 if err := parseMessageParams(msg, &subCmd); err != nil {
360 return err
361 }
362 if err := dc.handleCapCommand(subCmd, msg.Params[1:]); err != nil {
363 return err
364 }
365 case "AUTHENTICATE":
366 if !dc.caps["sasl"] {
367 return ircError{&irc.Message{
368 Command: irc.ERR_SASLFAIL,
369 Params: []string{"*", "AUTHENTICATE requires the \"sasl\" capability to be enabled"},
370 }}
371 }
372 if len(msg.Params) == 0 {
373 return ircError{&irc.Message{
374 Command: irc.ERR_SASLFAIL,
375 Params: []string{"*", "Missing AUTHENTICATE argument"},
376 }}
377 }
378 if dc.nick == "" {
379 return ircError{&irc.Message{
380 Command: irc.ERR_SASLFAIL,
381 Params: []string{"*", "Expected NICK command before AUTHENTICATE"},
382 }}
383 }
384
385 var resp []byte
386 if dc.saslServer == nil {
387 mech := strings.ToUpper(msg.Params[0])
388 switch mech {
389 case "PLAIN":
390 dc.saslServer = sasl.NewPlainServer(sasl.PlainAuthenticator(func(identity, username, password string) error {
391 return dc.authenticate(username, password)
392 }))
393 default:
394 return ircError{&irc.Message{
395 Command: irc.ERR_SASLFAIL,
396 Params: []string{"*", fmt.Sprintf("Unsupported SASL mechanism %q", mech)},
397 }}
398 }
399 } else if msg.Params[0] == "*" {
400 dc.saslServer = nil
401 return ircError{&irc.Message{
402 Command: irc.ERR_SASLABORTED,
403 Params: []string{"*", "SASL authentication aborted"},
404 }}
405 } else if msg.Params[0] == "+" {
406 resp = nil
407 } else {
408 // TODO: multi-line messages
409 var err error
410 resp, err = base64.StdEncoding.DecodeString(msg.Params[0])
411 if err != nil {
412 dc.saslServer = nil
413 return ircError{&irc.Message{
414 Command: irc.ERR_SASLFAIL,
415 Params: []string{"*", "Invalid base64-encoded response"},
416 }}
417 }
418 }
419
420 challenge, done, err := dc.saslServer.Next(resp)
421 if err != nil {
422 dc.saslServer = nil
423 if ircErr, ok := err.(ircError); ok && ircErr.Message.Command == irc.ERR_PASSWDMISMATCH {
424 return ircError{&irc.Message{
425 Command: irc.ERR_SASLFAIL,
426 Params: []string{"*", ircErr.Message.Params[1]},
427 }}
428 }
429 dc.SendMessage(&irc.Message{
430 Prefix: dc.srv.prefix(),
431 Command: irc.ERR_SASLFAIL,
432 Params: []string{"*", "SASL error"},
433 })
434 return fmt.Errorf("SASL authentication failed: %v", err)
435 } else if done {
436 dc.saslServer = nil
437 dc.SendMessage(&irc.Message{
438 Prefix: dc.srv.prefix(),
439 Command: irc.RPL_LOGGEDIN,
440 Params: []string{dc.nick, dc.nick, dc.user.Username, "You are now logged in"},
441 })
442 dc.SendMessage(&irc.Message{
443 Prefix: dc.srv.prefix(),
444 Command: irc.RPL_SASLSUCCESS,
445 Params: []string{dc.nick, "SASL authentication successful"},
446 })
447 } else {
448 challengeStr := "+"
449 if len(challenge) > 0 {
450 challengeStr = base64.StdEncoding.EncodeToString(challenge)
451 }
452
453 // TODO: multi-line messages
454 dc.SendMessage(&irc.Message{
455 Prefix: dc.srv.prefix(),
456 Command: "AUTHENTICATE",
457 Params: []string{challengeStr},
458 })
459 }
460 default:
461 dc.logger.Printf("unhandled message: %v", msg)
462 return newUnknownCommandError(msg.Command)
463 }
464 if dc.rawUsername != "" && dc.nick != "" && !dc.negociatingCaps {
465 return dc.register()
466 }
467 return nil
468}
469
470func (dc *downstreamConn) handleCapCommand(cmd string, args []string) error {
471 cmd = strings.ToUpper(cmd)
472
473 replyTo := dc.nick
474 if !dc.registered {
475 replyTo = "*"
476 }
477
478 switch cmd {
479 case "LS":
480 if len(args) > 0 {
481 var err error
482 if dc.capVersion, err = strconv.Atoi(args[0]); err != nil {
483 return err
484 }
485 }
486
487 var caps []string
488 if dc.capVersion >= 302 {
489 caps = append(caps, "sasl=PLAIN")
490 } else {
491 caps = append(caps, "sasl")
492 }
493
494 // TODO: multi-line replies
495 dc.SendMessage(&irc.Message{
496 Prefix: dc.srv.prefix(),
497 Command: "CAP",
498 Params: []string{replyTo, "LS", strings.Join(caps, " ")},
499 })
500
501 if !dc.registered {
502 dc.negociatingCaps = true
503 }
504 case "LIST":
505 var caps []string
506 for name := range dc.caps {
507 caps = append(caps, name)
508 }
509
510 // TODO: multi-line replies
511 dc.SendMessage(&irc.Message{
512 Prefix: dc.srv.prefix(),
513 Command: "CAP",
514 Params: []string{replyTo, "LIST", strings.Join(caps, " ")},
515 })
516 case "REQ":
517 if len(args) == 0 {
518 return ircError{&irc.Message{
519 Command: err_invalidcapcmd,
520 Params: []string{replyTo, cmd, "Missing argument in CAP REQ command"},
521 }}
522 }
523
524 caps := strings.Fields(args[0])
525 ack := true
526 for _, name := range caps {
527 name = strings.ToLower(name)
528 enable := !strings.HasPrefix(name, "-")
529 if !enable {
530 name = strings.TrimPrefix(name, "-")
531 }
532
533 enabled := dc.caps[name]
534 if enable == enabled {
535 continue
536 }
537
538 switch name {
539 case "sasl":
540 dc.caps[name] = enable
541 default:
542 ack = false
543 }
544 }
545
546 reply := "NAK"
547 if ack {
548 reply = "ACK"
549 }
550 dc.SendMessage(&irc.Message{
551 Prefix: dc.srv.prefix(),
552 Command: "CAP",
553 Params: []string{replyTo, reply, args[0]},
554 })
555 case "END":
556 dc.negociatingCaps = false
557 default:
558 return ircError{&irc.Message{
559 Command: err_invalidcapcmd,
560 Params: []string{replyTo, cmd, "Unknown CAP command"},
561 }}
562 }
563 return nil
564}
565
566func sanityCheckServer(addr string) error {
567 dialer := net.Dialer{Timeout: 30 * time.Second}
568 conn, err := tls.DialWithDialer(&dialer, "tcp", addr, nil)
569 if err != nil {
570 return err
571 }
572 return conn.Close()
573}
574
575func unmarshalUsername(rawUsername string) (username, network string) {
576 username = rawUsername
577 if i := strings.LastIndexAny(username, "/@"); i >= 0 {
578 network = username[i+1:]
579 }
580 if i := strings.IndexAny(username, "/@"); i >= 0 {
581 username = username[:i]
582 }
583 return username, network
584}
585
586func (dc *downstreamConn) authenticate(username, password string) error {
587 username, networkName := unmarshalUsername(username)
588
589 u, err := dc.srv.db.GetUser(username)
590 if err != nil {
591 dc.logger.Printf("failed authentication for %q: %v", username, err)
592 return errAuthFailed
593 }
594
595 err = bcrypt.CompareHashAndPassword([]byte(u.Password), []byte(password))
596 if err != nil {
597 dc.logger.Printf("failed authentication for %q: %v", username, err)
598 return errAuthFailed
599 }
600
601 dc.user = dc.srv.getUser(username)
602 if dc.user == nil {
603 dc.logger.Printf("failed authentication for %q: user not active", username)
604 return errAuthFailed
605 }
606 dc.networkName = networkName
607 return nil
608}
609
610func (dc *downstreamConn) register() error {
611 if dc.registered {
612 return fmt.Errorf("tried to register twice")
613 }
614
615 password := dc.password
616 dc.password = ""
617 if dc.user == nil {
618 if err := dc.authenticate(dc.rawUsername, password); err != nil {
619 return err
620 }
621 }
622
623 if dc.networkName == "" {
624 _, dc.networkName = unmarshalUsername(dc.rawUsername)
625 }
626
627 dc.registered = true
628 dc.username = dc.user.Username
629 dc.logger.Printf("registration complete for user %q", dc.username)
630 return nil
631}
632
633func (dc *downstreamConn) loadNetwork() error {
634 if dc.networkName == "" {
635 return nil
636 }
637
638 network := dc.user.getNetwork(dc.networkName)
639 if network == nil {
640 addr := dc.networkName
641 if !strings.ContainsRune(addr, ':') {
642 addr = addr + ":6697"
643 }
644
645 dc.logger.Printf("trying to connect to new network %q", addr)
646 if err := sanityCheckServer(addr); err != nil {
647 dc.logger.Printf("failed to connect to %q: %v", addr, err)
648 return ircError{&irc.Message{
649 Command: irc.ERR_PASSWDMISMATCH,
650 Params: []string{"*", fmt.Sprintf("Failed to connect to %q", dc.networkName)},
651 }}
652 }
653
654 dc.logger.Printf("auto-saving network %q", dc.networkName)
655 var err error
656 network, err = dc.user.createNetwork(&Network{
657 Addr: dc.networkName,
658 Nick: dc.nick,
659 })
660 if err != nil {
661 return err
662 }
663 }
664
665 dc.network = network
666 return nil
667}
668
669func (dc *downstreamConn) welcome() error {
670 if dc.user == nil || !dc.registered {
671 panic("tried to welcome an unregistered connection")
672 }
673
674 // TODO: doing this might take some time. We should do it in dc.register
675 // instead, but we'll potentially be adding a new network and this must be
676 // done in the user goroutine.
677 if err := dc.loadNetwork(); err != nil {
678 return err
679 }
680
681 firstDownstream := len(dc.user.downstreamConns) == 0
682
683 dc.SendMessage(&irc.Message{
684 Prefix: dc.srv.prefix(),
685 Command: irc.RPL_WELCOME,
686 Params: []string{dc.nick, "Welcome to soju, " + dc.nick},
687 })
688 dc.SendMessage(&irc.Message{
689 Prefix: dc.srv.prefix(),
690 Command: irc.RPL_YOURHOST,
691 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
692 })
693 dc.SendMessage(&irc.Message{
694 Prefix: dc.srv.prefix(),
695 Command: irc.RPL_CREATED,
696 Params: []string{dc.nick, "Who cares when the server was created?"},
697 })
698 dc.SendMessage(&irc.Message{
699 Prefix: dc.srv.prefix(),
700 Command: irc.RPL_MYINFO,
701 Params: []string{dc.nick, dc.srv.Hostname, "soju", "aiwroO", "OovaimnqpsrtklbeI"},
702 })
703 // TODO: RPL_ISUPPORT
704 dc.SendMessage(&irc.Message{
705 Prefix: dc.srv.prefix(),
706 Command: irc.ERR_NOMOTD,
707 Params: []string{dc.nick, "No MOTD"},
708 })
709
710 dc.forEachUpstream(func(uc *upstreamConn) {
711 for _, ch := range uc.channels {
712 if ch.complete {
713 dc.SendMessage(&irc.Message{
714 Prefix: dc.prefix(),
715 Command: "JOIN",
716 Params: []string{dc.marshalChannel(ch.conn, ch.Name)},
717 })
718
719 forwardChannel(dc, ch)
720 }
721 }
722 })
723
724 dc.forEachNetwork(func(net *network) {
725 // TODO: need to take dc.network into account when deciding whether or
726 // not to load history
727 dc.runNetwork(net, firstDownstream)
728 })
729
730 return nil
731}
732
733// runNetwork starts listening for messages coming from the network's ring
734// buffer.
735//
736// It panics if the network is not suitable for the downstream connection.
737func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
738 if dc.network != nil && net != dc.network {
739 panic("network not suitable for downstream connection")
740 }
741
742 historyName := dc.rawUsername
743
744 var seqPtr *uint64
745 if loadHistory {
746 net.lock.Lock()
747 seq, ok := net.history[historyName]
748 net.lock.Unlock()
749 if ok {
750 seqPtr = &seq
751 }
752 }
753
754 consumer, ch := net.ring.NewConsumer(seqPtr)
755 go func() {
756 for {
757 var closed bool
758 select {
759 case <-ch:
760 uc := net.upstream()
761 if uc == nil {
762 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
763 break
764 }
765 dc.ringMessages <- ringMessage{consumer, uc}
766 case <-dc.closed:
767 closed = true
768 }
769 if closed {
770 break
771 }
772 }
773
774 seq := consumer.Close()
775
776 net.lock.Lock()
777 net.history[historyName] = seq
778 net.lock.Unlock()
779 }()
780}
781
782func (dc *downstreamConn) runUntilRegistered() error {
783 for !dc.registered {
784 msg, err := dc.irc.ReadMessage()
785 if err != nil {
786 return fmt.Errorf("failed to read IRC command: %v", err)
787 }
788
789 if dc.srv.Debug {
790 dc.logger.Printf("received: %v", msg)
791 }
792
793 err = dc.handleMessage(msg)
794 if ircErr, ok := err.(ircError); ok {
795 ircErr.Message.Prefix = dc.srv.prefix()
796 dc.SendMessage(ircErr.Message)
797 } else if err != nil {
798 return fmt.Errorf("failed to handle IRC command %q: %v", msg, err)
799 }
800 }
801
802 return nil
803}
804
805func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
806 switch msg.Command {
807 case "CAP":
808 var subCmd string
809 if err := parseMessageParams(msg, &subCmd); err != nil {
810 return err
811 }
812 if err := dc.handleCapCommand(subCmd, msg.Params[1:]); err != nil {
813 return err
814 }
815 case "PING":
816 dc.SendMessage(&irc.Message{
817 Prefix: dc.srv.prefix(),
818 Command: "PONG",
819 Params: msg.Params,
820 })
821 return nil
822 case "USER":
823 return ircError{&irc.Message{
824 Command: irc.ERR_ALREADYREGISTERED,
825 Params: []string{dc.nick, "You may not reregister"},
826 }}
827 case "NICK":
828 var nick string
829 if err := parseMessageParams(msg, &nick); err != nil {
830 return err
831 }
832
833 var err error
834 dc.forEachNetwork(func(n *network) {
835 if err != nil {
836 return
837 }
838 n.Nick = nick
839 err = dc.srv.db.StoreNetwork(dc.user.Username, &n.Network)
840 })
841 if err != nil {
842 return err
843 }
844
845 dc.forEachUpstream(func(uc *upstreamConn) {
846 uc.SendMessage(msg)
847 })
848 case "JOIN":
849 var namesStr string
850 if err := parseMessageParams(msg, &namesStr); err != nil {
851 return err
852 }
853
854 var keys []string
855 if len(msg.Params) > 1 {
856 keys = strings.Split(msg.Params[1], ",")
857 }
858
859 for i, name := range strings.Split(namesStr, ",") {
860 uc, upstreamName, err := dc.unmarshalEntity(name)
861 if err != nil {
862 return err
863 }
864
865 var key string
866 if len(keys) > i {
867 key = keys[i]
868 }
869
870 params := []string{upstreamName}
871 if key != "" {
872 params = append(params, key)
873 }
874 uc.SendMessage(&irc.Message{
875 Command: "JOIN",
876 Params: params,
877 })
878
879 err = dc.srv.db.StoreChannel(uc.network.ID, &Channel{
880 Name: upstreamName,
881 Key: key,
882 })
883 if err != nil {
884 dc.logger.Printf("failed to create channel %q in DB: %v", upstreamName, err)
885 }
886 }
887 case "PART":
888 var namesStr string
889 if err := parseMessageParams(msg, &namesStr); err != nil {
890 return err
891 }
892
893 var reason string
894 if len(msg.Params) > 1 {
895 reason = msg.Params[1]
896 }
897
898 for _, name := range strings.Split(namesStr, ",") {
899 uc, upstreamName, err := dc.unmarshalEntity(name)
900 if err != nil {
901 return err
902 }
903
904 params := []string{upstreamName}
905 if reason != "" {
906 params = append(params, reason)
907 }
908 uc.SendMessage(&irc.Message{
909 Command: "PART",
910 Params: params,
911 })
912
913 if err := dc.srv.db.DeleteChannel(uc.network.ID, upstreamName); err != nil {
914 dc.logger.Printf("failed to delete channel %q in DB: %v", upstreamName, err)
915 }
916 }
917 case "KICK":
918 var channelStr, userStr string
919 if err := parseMessageParams(msg, &channelStr, &userStr); err != nil {
920 return err
921 }
922
923 channels := strings.Split(channelStr, ",")
924 users := strings.Split(userStr, ",")
925
926 var reason string
927 if len(msg.Params) > 2 {
928 reason = msg.Params[2]
929 }
930
931 if len(channels) != 1 && len(channels) != len(users) {
932 return ircError{&irc.Message{
933 Command: irc.ERR_BADCHANMASK,
934 Params: []string{dc.nick, channelStr, "Bad channel mask"},
935 }}
936 }
937
938 for i, user := range users {
939 var channel string
940 if len(channels) == 1 {
941 channel = channels[0]
942 } else {
943 channel = channels[i]
944 }
945
946 ucChannel, upstreamChannel, err := dc.unmarshalEntity(channel)
947 if err != nil {
948 return err
949 }
950
951 ucUser, upstreamUser, err := dc.unmarshalEntity(user)
952 if err != nil {
953 return err
954 }
955
956 if ucChannel != ucUser {
957 return ircError{&irc.Message{
958 Command: irc.ERR_USERNOTINCHANNEL,
959 Params: []string{dc.nick, user, channel, "They aren't on that channel"},
960 }}
961 }
962 uc := ucChannel
963
964 params := []string{upstreamChannel, upstreamUser}
965 if reason != "" {
966 params = append(params, reason)
967 }
968 uc.SendMessage(&irc.Message{
969 Command: "KICK",
970 Params: params,
971 })
972 }
973 case "MODE":
974 var name string
975 if err := parseMessageParams(msg, &name); err != nil {
976 return err
977 }
978
979 var modeStr string
980 if len(msg.Params) > 1 {
981 modeStr = msg.Params[1]
982 }
983
984 if name == dc.nick {
985 if modeStr != "" {
986 dc.forEachUpstream(func(uc *upstreamConn) {
987 uc.SendMessage(&irc.Message{
988 Command: "MODE",
989 Params: []string{uc.nick, modeStr},
990 })
991 })
992 } else {
993 dc.SendMessage(&irc.Message{
994 Prefix: dc.srv.prefix(),
995 Command: irc.RPL_UMODEIS,
996 Params: []string{dc.nick, ""}, // TODO
997 })
998 }
999 return nil
1000 }
1001
1002 uc, upstreamName, err := dc.unmarshalEntity(name)
1003 if err != nil {
1004 return err
1005 }
1006
1007 if !uc.isChannel(upstreamName) {
1008 return ircError{&irc.Message{
1009 Command: irc.ERR_USERSDONTMATCH,
1010 Params: []string{dc.nick, "Cannot change mode for other users"},
1011 }}
1012 }
1013
1014 if modeStr != "" {
1015 params := []string{upstreamName, modeStr}
1016 params = append(params, msg.Params[2:]...)
1017 uc.SendMessage(&irc.Message{
1018 Command: "MODE",
1019 Params: params,
1020 })
1021 } else {
1022 ch, ok := uc.channels[upstreamName]
1023 if !ok {
1024 return ircError{&irc.Message{
1025 Command: irc.ERR_NOSUCHCHANNEL,
1026 Params: []string{dc.nick, name, "No such channel"},
1027 }}
1028 }
1029
1030 if ch.modes == nil {
1031 // we haven't received the initial RPL_CHANNELMODEIS yet
1032 // ignore the request, we will broadcast the modes later when we receive RPL_CHANNELMODEIS
1033 return nil
1034 }
1035
1036 modeStr, modeParams := ch.modes.Format()
1037 params := []string{dc.nick, name, modeStr}
1038 params = append(params, modeParams...)
1039
1040 dc.SendMessage(&irc.Message{
1041 Prefix: dc.srv.prefix(),
1042 Command: irc.RPL_CHANNELMODEIS,
1043 Params: params,
1044 })
1045 if ch.creationTime != "" {
1046 dc.SendMessage(&irc.Message{
1047 Prefix: dc.srv.prefix(),
1048 Command: rpl_creationtime,
1049 Params: []string{dc.nick, name, ch.creationTime},
1050 })
1051 }
1052 }
1053 case "TOPIC":
1054 var channel string
1055 if err := parseMessageParams(msg, &channel); err != nil {
1056 return err
1057 }
1058
1059 uc, upstreamChannel, err := dc.unmarshalEntity(channel)
1060 if err != nil {
1061 return err
1062 }
1063
1064 if len(msg.Params) > 1 { // setting topic
1065 topic := msg.Params[1]
1066 uc.SendMessage(&irc.Message{
1067 Command: "TOPIC",
1068 Params: []string{upstreamChannel, topic},
1069 })
1070 } else { // getting topic
1071 ch, ok := uc.channels[upstreamChannel]
1072 if !ok {
1073 return ircError{&irc.Message{
1074 Command: irc.ERR_NOSUCHCHANNEL,
1075 Params: []string{dc.nick, upstreamChannel, "No such channel"},
1076 }}
1077 }
1078 sendTopic(dc, ch)
1079 }
1080 case "LIST":
1081 // TODO: support ELIST when supported by all upstreams
1082
1083 dc.user.pendingLISTsLock.Lock()
1084 defer dc.user.pendingLISTsLock.Unlock()
1085
1086 pl := pendingLIST{
1087 downstreamID: dc.id,
1088 pendingCommands: make(map[int64]*irc.Message),
1089 }
1090 var upstreamChannels map[int64][]string
1091 if len(msg.Params) > 0 {
1092 upstreamChannels = make(map[int64][]string)
1093 channels := strings.Split(msg.Params[0], ",")
1094 for _, channel := range channels {
1095 uc, upstreamChannel, err := dc.unmarshalEntity(channel)
1096 if err != nil {
1097 return err
1098 }
1099 upstreamChannels[uc.network.ID] = append(upstreamChannels[uc.network.ID], upstreamChannel)
1100 }
1101 }
1102
1103 dc.user.pendingLISTs = append(dc.user.pendingLISTs, pl)
1104 dc.forEachUpstream(func(uc *upstreamConn) {
1105 var params []string
1106 if upstreamChannels != nil {
1107 if channels, ok := upstreamChannels[uc.network.ID]; ok {
1108 params = []string{strings.Join(channels, ",")}
1109 } else {
1110 return
1111 }
1112 }
1113 pl.pendingCommands[uc.network.ID] = &irc.Message{
1114 Command: "LIST",
1115 Params: params,
1116 }
1117 uc.trySendList(dc.id)
1118 })
1119 case "NAMES":
1120 if len(msg.Params) == 0 {
1121 dc.SendMessage(&irc.Message{
1122 Prefix: dc.srv.prefix(),
1123 Command: irc.RPL_ENDOFNAMES,
1124 Params: []string{dc.nick, "*", "End of /NAMES list"},
1125 })
1126 return nil
1127 }
1128
1129 channels := strings.Split(msg.Params[0], ",")
1130 for _, channel := range channels {
1131 uc, upstreamChannel, err := dc.unmarshalEntity(channel)
1132 if err != nil {
1133 return err
1134 }
1135
1136 ch, ok := uc.channels[upstreamChannel]
1137 if ok {
1138 sendNames(dc, ch)
1139 } else {
1140 // NAMES on a channel we have not joined, ask upstream
1141 uc.SendMessageLabeled(dc.id, &irc.Message{
1142 Command: "NAMES",
1143 Params: []string{upstreamChannel},
1144 })
1145 }
1146 }
1147 case "WHO":
1148 if len(msg.Params) == 0 {
1149 // TODO: support WHO without parameters
1150 dc.SendMessage(&irc.Message{
1151 Prefix: dc.srv.prefix(),
1152 Command: irc.RPL_ENDOFWHO,
1153 Params: []string{dc.nick, "*", "End of /WHO list"},
1154 })
1155 return nil
1156 }
1157
1158 // TODO: support WHO masks
1159 entity := msg.Params[0]
1160
1161 if entity == dc.nick {
1162 // TODO: support AWAY (H/G) in self WHO reply
1163 dc.SendMessage(&irc.Message{
1164 Prefix: dc.srv.prefix(),
1165 Command: irc.RPL_WHOREPLY,
1166 Params: []string{dc.nick, "*", dc.username, dc.hostname, dc.srv.Hostname, dc.nick, "H", "0 " + dc.realname},
1167 })
1168 dc.SendMessage(&irc.Message{
1169 Prefix: dc.srv.prefix(),
1170 Command: irc.RPL_ENDOFWHO,
1171 Params: []string{dc.nick, dc.nick, "End of /WHO list"},
1172 })
1173 return nil
1174 }
1175
1176 uc, upstreamName, err := dc.unmarshalEntity(entity)
1177 if err != nil {
1178 return err
1179 }
1180
1181 var params []string
1182 if len(msg.Params) == 2 {
1183 params = []string{upstreamName, msg.Params[1]}
1184 } else {
1185 params = []string{upstreamName}
1186 }
1187
1188 uc.SendMessageLabeled(dc.id, &irc.Message{
1189 Command: "WHO",
1190 Params: params,
1191 })
1192 case "WHOIS":
1193 if len(msg.Params) == 0 {
1194 return ircError{&irc.Message{
1195 Command: irc.ERR_NONICKNAMEGIVEN,
1196 Params: []string{dc.nick, "No nickname given"},
1197 }}
1198 }
1199
1200 var target, mask string
1201 if len(msg.Params) == 1 {
1202 target = ""
1203 mask = msg.Params[0]
1204 } else {
1205 target = msg.Params[0]
1206 mask = msg.Params[1]
1207 }
1208 // TODO: support multiple WHOIS users
1209 if i := strings.IndexByte(mask, ','); i >= 0 {
1210 mask = mask[:i]
1211 }
1212
1213 if mask == dc.nick {
1214 dc.SendMessage(&irc.Message{
1215 Prefix: dc.srv.prefix(),
1216 Command: irc.RPL_WHOISUSER,
1217 Params: []string{dc.nick, dc.nick, dc.username, dc.hostname, "*", dc.realname},
1218 })
1219 dc.SendMessage(&irc.Message{
1220 Prefix: dc.srv.prefix(),
1221 Command: irc.RPL_WHOISSERVER,
1222 Params: []string{dc.nick, dc.nick, dc.srv.Hostname, "soju"},
1223 })
1224 dc.SendMessage(&irc.Message{
1225 Prefix: dc.srv.prefix(),
1226 Command: irc.RPL_ENDOFWHOIS,
1227 Params: []string{dc.nick, dc.nick, "End of /WHOIS list"},
1228 })
1229 return nil
1230 }
1231
1232 // TODO: support WHOIS masks
1233 uc, upstreamNick, err := dc.unmarshalEntity(mask)
1234 if err != nil {
1235 return err
1236 }
1237
1238 var params []string
1239 if target != "" {
1240 params = []string{target, upstreamNick}
1241 } else {
1242 params = []string{upstreamNick}
1243 }
1244
1245 uc.SendMessageLabeled(dc.id, &irc.Message{
1246 Command: "WHOIS",
1247 Params: params,
1248 })
1249 case "PRIVMSG":
1250 var targetsStr, text string
1251 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
1252 return err
1253 }
1254
1255 for _, name := range strings.Split(targetsStr, ",") {
1256 if name == serviceNick {
1257 handleServicePRIVMSG(dc, text)
1258 continue
1259 }
1260
1261 uc, upstreamName, err := dc.unmarshalEntity(name)
1262 if err != nil {
1263 return err
1264 }
1265
1266 if upstreamName == "NickServ" {
1267 dc.handleNickServPRIVMSG(uc, text)
1268 }
1269
1270 uc.SendMessage(&irc.Message{
1271 Command: "PRIVMSG",
1272 Params: []string{upstreamName, text},
1273 })
1274
1275 echoMsg := &irc.Message{
1276 Prefix: &irc.Prefix{
1277 Name: uc.nick,
1278 User: uc.username,
1279 },
1280 Command: "PRIVMSG",
1281 Params: []string{upstreamName, text},
1282 }
1283 dc.lock.Lock()
1284 dc.ourMessages[echoMsg] = struct{}{}
1285 dc.lock.Unlock()
1286
1287 uc.network.ring.Produce(echoMsg)
1288 }
1289 case "NOTICE":
1290 var targetsStr, text string
1291 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
1292 return err
1293 }
1294
1295 for _, name := range strings.Split(targetsStr, ",") {
1296 uc, upstreamName, err := dc.unmarshalEntity(name)
1297 if err != nil {
1298 return err
1299 }
1300
1301 uc.SendMessage(&irc.Message{
1302 Command: "NOTICE",
1303 Params: []string{upstreamName, text},
1304 })
1305 }
1306 case "INVITE":
1307 var user, channel string
1308 if err := parseMessageParams(msg, &user, &channel); err != nil {
1309 return err
1310 }
1311
1312 ucChannel, upstreamChannel, err := dc.unmarshalEntity(channel)
1313 if err != nil {
1314 return err
1315 }
1316
1317 ucUser, upstreamUser, err := dc.unmarshalEntity(user)
1318 if err != nil {
1319 return err
1320 }
1321
1322 if ucChannel != ucUser {
1323 return ircError{&irc.Message{
1324 Command: irc.ERR_USERNOTINCHANNEL,
1325 Params: []string{dc.nick, user, channel, "They aren't on that channel"},
1326 }}
1327 }
1328 uc := ucChannel
1329
1330 uc.SendMessageLabeled(dc.id, &irc.Message{
1331 Command: "INVITE",
1332 Params: []string{upstreamUser, upstreamChannel},
1333 })
1334 default:
1335 dc.logger.Printf("unhandled message: %v", msg)
1336 return newUnknownCommandError(msg.Command)
1337 }
1338 return nil
1339}
1340
1341func (dc *downstreamConn) handleNickServPRIVMSG(uc *upstreamConn, text string) {
1342 username, password, ok := parseNickServCredentials(text, uc.nick)
1343 if !ok {
1344 return
1345 }
1346
1347 dc.logger.Printf("auto-saving NickServ credentials with username %q", username)
1348 n := uc.network
1349 n.SASL.Mechanism = "PLAIN"
1350 n.SASL.Plain.Username = username
1351 n.SASL.Plain.Password = password
1352 if err := dc.srv.db.StoreNetwork(dc.user.Username, &n.Network); err != nil {
1353 dc.logger.Printf("failed to save NickServ credentials: %v", err)
1354 }
1355}
1356
1357func parseNickServCredentials(text, nick string) (username, password string, ok bool) {
1358 fields := strings.Fields(text)
1359 if len(fields) < 2 {
1360 return "", "", false
1361 }
1362 cmd := strings.ToUpper(fields[0])
1363 params := fields[1:]
1364 switch cmd {
1365 case "REGISTER":
1366 username = nick
1367 password = params[0]
1368 case "IDENTIFY":
1369 if len(params) == 1 {
1370 username = nick
1371 } else {
1372 username = params[0]
1373 }
1374 password = params[1]
1375 }
1376 return username, password, true
1377}
Note: See TracBrowser for help on using the repository browser.