source: code/trunk/upstream.go@ 215

Last change on this file since 215 was 215, checked in by contact, 5 years ago

Introduce messageLogger

This centralizes formatting related to message logging in a single
place.

File size: 34.3 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "os"
11 "strconv"
12 "strings"
13 "time"
14
15 "github.com/emersion/go-sasl"
16 "gopkg.in/irc.v3"
17)
18
19type upstreamChannel struct {
20 Name string
21 conn *upstreamConn
22 Topic string
23 TopicWho string
24 TopicTime time.Time
25 Status channelStatus
26 modes channelModes
27 creationTime string
28 Members map[string]*membership
29 complete bool
30}
31
32type upstreamConn struct {
33 conn
34
35 network *network
36 user *user
37
38 serverName string
39 availableUserModes string
40 availableChannelModes map[byte]channelModeType
41 availableChannelTypes string
42 availableMemberships []membership
43
44 registered bool
45 nick string
46 username string
47 realname string
48 modes userModes
49 channels map[string]*upstreamChannel
50 caps map[string]string
51 batches map[string]batch
52 away bool
53
54 tagsSupported bool
55 labelsSupported bool
56 nextLabelID uint64
57
58 saslClient sasl.Client
59 saslStarted bool
60
61 // set of LIST commands in progress, per downstream
62 pendingLISTDownstreamSet map[uint64]struct{}
63
64 messageLoggers map[string]*messageLogger
65}
66
67type entityLog struct {
68 name string
69 file *os.File
70}
71
72func connectToUpstream(network *network) (*upstreamConn, error) {
73 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
74
75 addr := network.Addr
76 if !strings.ContainsRune(addr, ':') {
77 addr = addr + ":6697"
78 }
79
80 dialer := net.Dialer{Timeout: connectTimeout}
81
82 logger.Printf("connecting to TLS server at address %q", addr)
83 netConn, err := tls.DialWithDialer(&dialer, "tcp", addr, nil)
84 if err != nil {
85 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
86 }
87
88 uc := &upstreamConn{
89 conn: *newConn(network.user.srv, netConn, logger),
90 network: network,
91 user: network.user,
92 channels: make(map[string]*upstreamChannel),
93 caps: make(map[string]string),
94 batches: make(map[string]batch),
95 availableChannelTypes: stdChannelTypes,
96 availableChannelModes: stdChannelModes,
97 availableMemberships: stdMemberships,
98 pendingLISTDownstreamSet: make(map[uint64]struct{}),
99 messageLoggers: make(map[string]*messageLogger),
100 }
101
102 return uc, nil
103}
104
105func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
106 uc.user.forEachDownstream(func(dc *downstreamConn) {
107 if dc.network != nil && dc.network != uc.network {
108 return
109 }
110 f(dc)
111 })
112}
113
114func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
115 uc.forEachDownstream(func(dc *downstreamConn) {
116 if id != 0 && id != dc.id {
117 return
118 }
119 f(dc)
120 })
121}
122
123func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
124 ch, ok := uc.channels[name]
125 if !ok {
126 return nil, fmt.Errorf("unknown channel %q", name)
127 }
128 return ch, nil
129}
130
131func (uc *upstreamConn) isChannel(entity string) bool {
132 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
133 return true
134 }
135 return false
136}
137
138func (uc *upstreamConn) getPendingLIST() *pendingLIST {
139 for _, pl := range uc.user.pendingLISTs {
140 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
141 continue
142 }
143 return &pl
144 }
145 return nil
146}
147
148func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
149 found = false
150 for i := 0; i < len(uc.user.pendingLISTs); i++ {
151 pl := uc.user.pendingLISTs[i]
152 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
153 continue
154 }
155 delete(pl.pendingCommands, uc.network.ID)
156 if len(pl.pendingCommands) == 0 {
157 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
158 i--
159 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
160 dc.SendMessage(&irc.Message{
161 Prefix: dc.srv.prefix(),
162 Command: irc.RPL_LISTEND,
163 Params: []string{dc.nick, "End of /LIST"},
164 })
165 })
166 }
167 found = true
168 if !all {
169 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
170 uc.user.forEachUpstream(func(uc *upstreamConn) {
171 uc.trySendLIST(pl.downstreamID)
172 })
173 return
174 }
175 }
176 return
177}
178
179func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
180 // must be called with a lock in uc.user.pendingLISTsLock
181
182 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
183 // a LIST command is already pending
184 // we will try again when that command is completed
185 return
186 }
187
188 for _, pl := range uc.user.pendingLISTs {
189 if pl.downstreamID != downstreamID {
190 continue
191 }
192 // this is the first pending LIST command list of the downstream
193 listCommand, ok := pl.pendingCommands[uc.network.ID]
194 if !ok {
195 // there is no command for this upstream in these LIST commands
196 // do not send anything
197 continue
198 }
199 // there is a command for this upstream in these LIST commands
200 // send it now
201
202 uc.SendMessageLabeled(downstreamID, listCommand)
203
204 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
205 return
206 }
207}
208
209func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
210 for _, m := range uc.availableMemberships {
211 if m.Prefix == s[0] {
212 return &m, s[1:]
213 }
214 }
215 return nil, s
216}
217
218func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
219 var label string
220 if l, ok := msg.GetTag("label"); ok {
221 label = l
222 }
223
224 var msgBatch *batch
225 if batchName, ok := msg.GetTag("batch"); ok {
226 b, ok := uc.batches[batchName]
227 if !ok {
228 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
229 }
230 msgBatch = &b
231 if label == "" {
232 label = msgBatch.Label
233 }
234 }
235
236 var downstreamID uint64 = 0
237 if label != "" {
238 var labelOffset uint64
239 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
240 if err == nil && n < 2 {
241 err = errors.New("not enough arguments")
242 }
243 if err != nil {
244 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
245 }
246 }
247
248 switch msg.Command {
249 case "PING":
250 uc.SendMessage(&irc.Message{
251 Command: "PONG",
252 Params: msg.Params,
253 })
254 return nil
255 case "NOTICE":
256 uc.logger.Print(msg)
257
258 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
259 uc.forEachDownstream(func(dc *downstreamConn) {
260 dc.SendMessage(&irc.Message{
261 Command: "NOTICE",
262 Params: msg.Params,
263 })
264 })
265 } else { // regular user NOTICE
266 var nick, text string
267 if err := parseMessageParams(msg, &nick, &text); err != nil {
268 return err
269 }
270
271 target := nick
272 if nick == uc.nick {
273 target = msg.Prefix.Name
274 }
275 uc.appendLog(target, msg)
276
277 uc.forEachDownstream(func(dc *downstreamConn) {
278 dc.SendMessage(&irc.Message{
279 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
280 Command: "NOTICE",
281 Params: []string{dc.marshalEntity(uc, nick), text},
282 })
283 })
284 }
285 case "CAP":
286 var subCmd string
287 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
288 return err
289 }
290 subCmd = strings.ToUpper(subCmd)
291 subParams := msg.Params[2:]
292 switch subCmd {
293 case "LS":
294 if len(subParams) < 1 {
295 return newNeedMoreParamsError(msg.Command)
296 }
297 caps := strings.Fields(subParams[len(subParams)-1])
298 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
299
300 for _, s := range caps {
301 kv := strings.SplitN(s, "=", 2)
302 k := strings.ToLower(kv[0])
303 var v string
304 if len(kv) == 2 {
305 v = kv[1]
306 }
307 uc.caps[k] = v
308 }
309
310 if more {
311 break // wait to receive all capabilities
312 }
313
314 requestCaps := make([]string, 0, 16)
315 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time"} {
316 if _, ok := uc.caps[c]; ok {
317 requestCaps = append(requestCaps, c)
318 }
319 }
320
321 if uc.requestSASL() {
322 requestCaps = append(requestCaps, "sasl")
323 }
324
325 if len(requestCaps) > 0 {
326 uc.SendMessage(&irc.Message{
327 Command: "CAP",
328 Params: []string{"REQ", strings.Join(requestCaps, " ")},
329 })
330 }
331
332 if uc.requestSASL() {
333 break // we'll send CAP END after authentication is completed
334 }
335
336 uc.SendMessage(&irc.Message{
337 Command: "CAP",
338 Params: []string{"END"},
339 })
340 case "ACK", "NAK":
341 if len(subParams) < 1 {
342 return newNeedMoreParamsError(msg.Command)
343 }
344 caps := strings.Fields(subParams[0])
345
346 for _, name := range caps {
347 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
348 return err
349 }
350 }
351
352 if uc.saslClient == nil {
353 uc.SendMessage(&irc.Message{
354 Command: "CAP",
355 Params: []string{"END"},
356 })
357 }
358 default:
359 uc.logger.Printf("unhandled message: %v", msg)
360 }
361 case "AUTHENTICATE":
362 if uc.saslClient == nil {
363 return fmt.Errorf("received unexpected AUTHENTICATE message")
364 }
365
366 // TODO: if a challenge is 400 bytes long, buffer it
367 var challengeStr string
368 if err := parseMessageParams(msg, &challengeStr); err != nil {
369 uc.SendMessage(&irc.Message{
370 Command: "AUTHENTICATE",
371 Params: []string{"*"},
372 })
373 return err
374 }
375
376 var challenge []byte
377 if challengeStr != "+" {
378 var err error
379 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
380 if err != nil {
381 uc.SendMessage(&irc.Message{
382 Command: "AUTHENTICATE",
383 Params: []string{"*"},
384 })
385 return err
386 }
387 }
388
389 var resp []byte
390 var err error
391 if !uc.saslStarted {
392 _, resp, err = uc.saslClient.Start()
393 uc.saslStarted = true
394 } else {
395 resp, err = uc.saslClient.Next(challenge)
396 }
397 if err != nil {
398 uc.SendMessage(&irc.Message{
399 Command: "AUTHENTICATE",
400 Params: []string{"*"},
401 })
402 return err
403 }
404
405 // TODO: send response in multiple chunks if >= 400 bytes
406 var respStr = "+"
407 if resp != nil {
408 respStr = base64.StdEncoding.EncodeToString(resp)
409 }
410
411 uc.SendMessage(&irc.Message{
412 Command: "AUTHENTICATE",
413 Params: []string{respStr},
414 })
415 case irc.RPL_LOGGEDIN:
416 var account string
417 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
418 return err
419 }
420 uc.logger.Printf("logged in with account %q", account)
421 case irc.RPL_LOGGEDOUT:
422 uc.logger.Printf("logged out")
423 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
424 var info string
425 if err := parseMessageParams(msg, nil, &info); err != nil {
426 return err
427 }
428 switch msg.Command {
429 case irc.ERR_NICKLOCKED:
430 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
431 case irc.ERR_SASLFAIL:
432 uc.logger.Printf("SASL authentication failed: %v", info)
433 case irc.ERR_SASLTOOLONG:
434 uc.logger.Printf("SASL message too long: %v", info)
435 }
436
437 uc.saslClient = nil
438 uc.saslStarted = false
439
440 uc.SendMessage(&irc.Message{
441 Command: "CAP",
442 Params: []string{"END"},
443 })
444 case irc.RPL_WELCOME:
445 uc.registered = true
446 uc.logger.Printf("connection registered")
447
448 channels, err := uc.srv.db.ListChannels(uc.network.ID)
449 if err != nil {
450 uc.logger.Printf("failed to list channels from database: %v", err)
451 break
452 }
453
454 for _, ch := range channels {
455 params := []string{ch.Name}
456 if ch.Key != "" {
457 params = append(params, ch.Key)
458 }
459 uc.SendMessage(&irc.Message{
460 Command: "JOIN",
461 Params: params,
462 })
463 }
464 case irc.RPL_MYINFO:
465 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
466 return err
467 }
468 case irc.RPL_ISUPPORT:
469 if err := parseMessageParams(msg, nil, nil); err != nil {
470 return err
471 }
472 for _, token := range msg.Params[1 : len(msg.Params)-1] {
473 negate := false
474 parameter := token
475 value := ""
476 if strings.HasPrefix(token, "-") {
477 negate = true
478 token = token[1:]
479 } else {
480 if i := strings.IndexByte(token, '='); i >= 0 {
481 parameter = token[:i]
482 value = token[i+1:]
483 }
484 }
485 if !negate {
486 switch parameter {
487 case "CHANMODES":
488 parts := strings.SplitN(value, ",", 5)
489 if len(parts) < 4 {
490 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
491 }
492 modes := make(map[byte]channelModeType)
493 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
494 for j := 0; j < len(parts[i]); j++ {
495 mode := parts[i][j]
496 modes[mode] = mt
497 }
498 }
499 uc.availableChannelModes = modes
500 case "CHANTYPES":
501 uc.availableChannelTypes = value
502 case "PREFIX":
503 if value == "" {
504 uc.availableMemberships = nil
505 } else {
506 if value[0] != '(' {
507 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
508 }
509 sep := strings.IndexByte(value, ')')
510 if sep < 0 || len(value) != sep*2 {
511 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
512 }
513 memberships := make([]membership, len(value)/2-1)
514 for i := range memberships {
515 memberships[i] = membership{
516 Mode: value[i+1],
517 Prefix: value[sep+i+1],
518 }
519 }
520 uc.availableMemberships = memberships
521 }
522 }
523 } else {
524 // TODO: handle ISUPPORT negations
525 }
526 }
527 case "BATCH":
528 var tag string
529 if err := parseMessageParams(msg, &tag); err != nil {
530 return err
531 }
532
533 if strings.HasPrefix(tag, "+") {
534 tag = tag[1:]
535 if _, ok := uc.batches[tag]; ok {
536 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
537 }
538 var batchType string
539 if err := parseMessageParams(msg, nil, &batchType); err != nil {
540 return err
541 }
542 label := label
543 if label == "" && msgBatch != nil {
544 label = msgBatch.Label
545 }
546 uc.batches[tag] = batch{
547 Type: batchType,
548 Params: msg.Params[2:],
549 Outer: msgBatch,
550 Label: label,
551 }
552 } else if strings.HasPrefix(tag, "-") {
553 tag = tag[1:]
554 if _, ok := uc.batches[tag]; !ok {
555 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
556 }
557 delete(uc.batches, tag)
558 } else {
559 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
560 }
561 case "NICK":
562 if msg.Prefix == nil {
563 return fmt.Errorf("expected a prefix")
564 }
565
566 var newNick string
567 if err := parseMessageParams(msg, &newNick); err != nil {
568 return err
569 }
570
571 if msg.Prefix.Name == uc.nick {
572 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
573 uc.nick = newNick
574 }
575
576 for _, ch := range uc.channels {
577 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
578 delete(ch.Members, msg.Prefix.Name)
579 ch.Members[newNick] = membership
580 uc.appendLog(ch.Name, msg)
581 }
582 }
583
584 if msg.Prefix.Name != uc.nick {
585 uc.forEachDownstream(func(dc *downstreamConn) {
586 dc.SendMessage(&irc.Message{
587 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
588 Command: "NICK",
589 Params: []string{newNick},
590 })
591 })
592 }
593 case "JOIN":
594 if msg.Prefix == nil {
595 return fmt.Errorf("expected a prefix")
596 }
597
598 var channels string
599 if err := parseMessageParams(msg, &channels); err != nil {
600 return err
601 }
602
603 for _, ch := range strings.Split(channels, ",") {
604 if msg.Prefix.Name == uc.nick {
605 uc.logger.Printf("joined channel %q", ch)
606 uc.channels[ch] = &upstreamChannel{
607 Name: ch,
608 conn: uc,
609 Members: make(map[string]*membership),
610 }
611
612 uc.SendMessage(&irc.Message{
613 Command: "MODE",
614 Params: []string{ch},
615 })
616 } else {
617 ch, err := uc.getChannel(ch)
618 if err != nil {
619 return err
620 }
621 ch.Members[msg.Prefix.Name] = nil
622 }
623
624 uc.appendLog(ch, msg)
625
626 uc.forEachDownstream(func(dc *downstreamConn) {
627 dc.SendMessage(&irc.Message{
628 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
629 Command: "JOIN",
630 Params: []string{dc.marshalChannel(uc, ch)},
631 })
632 })
633 }
634 case "PART":
635 if msg.Prefix == nil {
636 return fmt.Errorf("expected a prefix")
637 }
638
639 var channels string
640 if err := parseMessageParams(msg, &channels); err != nil {
641 return err
642 }
643
644 var reason string
645 if len(msg.Params) > 1 {
646 reason = msg.Params[1]
647 }
648
649 for _, ch := range strings.Split(channels, ",") {
650 if msg.Prefix.Name == uc.nick {
651 uc.logger.Printf("parted channel %q", ch)
652 delete(uc.channels, ch)
653 } else {
654 ch, err := uc.getChannel(ch)
655 if err != nil {
656 return err
657 }
658 delete(ch.Members, msg.Prefix.Name)
659 }
660
661 uc.appendLog(ch, msg)
662
663 uc.forEachDownstream(func(dc *downstreamConn) {
664 params := []string{dc.marshalChannel(uc, ch)}
665 if reason != "" {
666 params = append(params, reason)
667 }
668 dc.SendMessage(&irc.Message{
669 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
670 Command: "PART",
671 Params: params,
672 })
673 })
674 }
675 case "KICK":
676 if msg.Prefix == nil {
677 return fmt.Errorf("expected a prefix")
678 }
679
680 var channel, user string
681 if err := parseMessageParams(msg, &channel, &user); err != nil {
682 return err
683 }
684
685 var reason string
686 if len(msg.Params) > 2 {
687 reason = msg.Params[2]
688 }
689
690 if user == uc.nick {
691 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
692 delete(uc.channels, channel)
693 } else {
694 ch, err := uc.getChannel(channel)
695 if err != nil {
696 return err
697 }
698 delete(ch.Members, user)
699 }
700
701 uc.appendLog(channel, msg)
702
703 uc.forEachDownstream(func(dc *downstreamConn) {
704 params := []string{dc.marshalChannel(uc, channel), dc.marshalNick(uc, user)}
705 if reason != "" {
706 params = append(params, reason)
707 }
708 dc.SendMessage(&irc.Message{
709 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
710 Command: "KICK",
711 Params: params,
712 })
713 })
714 case "QUIT":
715 if msg.Prefix == nil {
716 return fmt.Errorf("expected a prefix")
717 }
718
719 if msg.Prefix.Name == uc.nick {
720 uc.logger.Printf("quit")
721 }
722
723 for _, ch := range uc.channels {
724 if _, ok := ch.Members[msg.Prefix.Name]; ok {
725 delete(ch.Members, msg.Prefix.Name)
726
727 uc.appendLog(ch.Name, msg)
728 }
729 }
730
731 if msg.Prefix.Name != uc.nick {
732 uc.forEachDownstream(func(dc *downstreamConn) {
733 dc.SendMessage(&irc.Message{
734 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
735 Command: "QUIT",
736 Params: msg.Params,
737 })
738 })
739 }
740 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
741 var name, topic string
742 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
743 return err
744 }
745 ch, err := uc.getChannel(name)
746 if err != nil {
747 return err
748 }
749 if msg.Command == irc.RPL_TOPIC {
750 ch.Topic = topic
751 } else {
752 ch.Topic = ""
753 }
754 case "TOPIC":
755 var name string
756 if err := parseMessageParams(msg, &name); err != nil {
757 return err
758 }
759 ch, err := uc.getChannel(name)
760 if err != nil {
761 return err
762 }
763 if len(msg.Params) > 1 {
764 ch.Topic = msg.Params[1]
765 } else {
766 ch.Topic = ""
767 }
768 uc.forEachDownstream(func(dc *downstreamConn) {
769 params := []string{dc.marshalChannel(uc, name)}
770 if ch.Topic != "" {
771 params = append(params, ch.Topic)
772 }
773 dc.SendMessage(&irc.Message{
774 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
775 Command: "TOPIC",
776 Params: params,
777 })
778 })
779 case "MODE":
780 var name, modeStr string
781 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
782 return err
783 }
784
785 if !uc.isChannel(name) { // user mode change
786 if name != uc.nick {
787 return fmt.Errorf("received MODE message for unknown nick %q", name)
788 }
789 return uc.modes.Apply(modeStr)
790 // TODO: notify downstreams about user mode change?
791 } else { // channel mode change
792 ch, err := uc.getChannel(name)
793 if err != nil {
794 return err
795 }
796
797 if ch.modes != nil {
798 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
799 return err
800 }
801 }
802
803 uc.appendLog(ch.Name, msg)
804
805 uc.forEachDownstream(func(dc *downstreamConn) {
806 params := []string{dc.marshalChannel(uc, name), modeStr}
807 params = append(params, msg.Params[2:]...)
808
809 dc.SendMessage(&irc.Message{
810 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
811 Command: "MODE",
812 Params: params,
813 })
814 })
815 }
816 case irc.RPL_UMODEIS:
817 if err := parseMessageParams(msg, nil); err != nil {
818 return err
819 }
820 modeStr := ""
821 if len(msg.Params) > 1 {
822 modeStr = msg.Params[1]
823 }
824
825 uc.modes = ""
826 if err := uc.modes.Apply(modeStr); err != nil {
827 return err
828 }
829 // TODO: send RPL_UMODEIS to downstream connections when applicable
830 case irc.RPL_CHANNELMODEIS:
831 var channel string
832 if err := parseMessageParams(msg, nil, &channel); err != nil {
833 return err
834 }
835 modeStr := ""
836 if len(msg.Params) > 2 {
837 modeStr = msg.Params[2]
838 }
839
840 ch, err := uc.getChannel(channel)
841 if err != nil {
842 return err
843 }
844
845 firstMode := ch.modes == nil
846 ch.modes = make(map[byte]string)
847 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
848 return err
849 }
850 if firstMode {
851 modeStr, modeParams := ch.modes.Format()
852
853 uc.forEachDownstream(func(dc *downstreamConn) {
854 params := []string{dc.nick, dc.marshalChannel(uc, channel), modeStr}
855 params = append(params, modeParams...)
856
857 dc.SendMessage(&irc.Message{
858 Prefix: dc.srv.prefix(),
859 Command: irc.RPL_CHANNELMODEIS,
860 Params: params,
861 })
862 })
863 }
864 case rpl_creationtime:
865 var channel, creationTime string
866 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
867 return err
868 }
869
870 ch, err := uc.getChannel(channel)
871 if err != nil {
872 return err
873 }
874
875 firstCreationTime := ch.creationTime == ""
876 ch.creationTime = creationTime
877 if firstCreationTime {
878 uc.forEachDownstream(func(dc *downstreamConn) {
879 dc.SendMessage(&irc.Message{
880 Prefix: dc.srv.prefix(),
881 Command: rpl_creationtime,
882 Params: []string{dc.nick, channel, creationTime},
883 })
884 })
885 }
886 case rpl_topicwhotime:
887 var name, who, timeStr string
888 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
889 return err
890 }
891 ch, err := uc.getChannel(name)
892 if err != nil {
893 return err
894 }
895 ch.TopicWho = who
896 sec, err := strconv.ParseInt(timeStr, 10, 64)
897 if err != nil {
898 return fmt.Errorf("failed to parse topic time: %v", err)
899 }
900 ch.TopicTime = time.Unix(sec, 0)
901 case irc.RPL_LIST:
902 var channel, clients, topic string
903 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
904 return err
905 }
906
907 pl := uc.getPendingLIST()
908 if pl == nil {
909 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
910 }
911
912 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
913 dc.SendMessage(&irc.Message{
914 Prefix: dc.srv.prefix(),
915 Command: irc.RPL_LIST,
916 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
917 })
918 })
919 case irc.RPL_LISTEND:
920 ok := uc.endPendingLISTs(false)
921 if !ok {
922 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
923 }
924 case irc.RPL_NAMREPLY:
925 var name, statusStr, members string
926 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
927 return err
928 }
929
930 ch, ok := uc.channels[name]
931 if !ok {
932 // NAMES on a channel we have not joined, forward to downstream
933 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
934 channel := dc.marshalChannel(uc, name)
935 members := splitSpace(members)
936 for i, member := range members {
937 membership, nick := uc.parseMembershipPrefix(member)
938 members[i] = membership.String() + dc.marshalNick(uc, nick)
939 }
940 memberStr := strings.Join(members, " ")
941
942 dc.SendMessage(&irc.Message{
943 Prefix: dc.srv.prefix(),
944 Command: irc.RPL_NAMREPLY,
945 Params: []string{dc.nick, statusStr, channel, memberStr},
946 })
947 })
948 return nil
949 }
950
951 status, err := parseChannelStatus(statusStr)
952 if err != nil {
953 return err
954 }
955 ch.Status = status
956
957 for _, s := range splitSpace(members) {
958 membership, nick := uc.parseMembershipPrefix(s)
959 ch.Members[nick] = membership
960 }
961 case irc.RPL_ENDOFNAMES:
962 var name string
963 if err := parseMessageParams(msg, nil, &name); err != nil {
964 return err
965 }
966
967 ch, ok := uc.channels[name]
968 if !ok {
969 // NAMES on a channel we have not joined, forward to downstream
970 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
971 channel := dc.marshalChannel(uc, name)
972
973 dc.SendMessage(&irc.Message{
974 Prefix: dc.srv.prefix(),
975 Command: irc.RPL_ENDOFNAMES,
976 Params: []string{dc.nick, channel, "End of /NAMES list"},
977 })
978 })
979 return nil
980 }
981
982 if ch.complete {
983 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
984 }
985 ch.complete = true
986
987 uc.forEachDownstream(func(dc *downstreamConn) {
988 forwardChannel(dc, ch)
989 })
990 case irc.RPL_WHOREPLY:
991 var channel, username, host, server, nick, mode, trailing string
992 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
993 return err
994 }
995
996 parts := strings.SplitN(trailing, " ", 2)
997 if len(parts) != 2 {
998 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
999 }
1000 realname := parts[1]
1001 hops, err := strconv.Atoi(parts[0])
1002 if err != nil {
1003 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
1004 }
1005 hops++
1006
1007 trailing = strconv.Itoa(hops) + " " + realname
1008
1009 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1010 channel := channel
1011 if channel != "*" {
1012 channel = dc.marshalChannel(uc, channel)
1013 }
1014 nick := dc.marshalNick(uc, nick)
1015 dc.SendMessage(&irc.Message{
1016 Prefix: dc.srv.prefix(),
1017 Command: irc.RPL_WHOREPLY,
1018 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
1019 })
1020 })
1021 case irc.RPL_ENDOFWHO:
1022 var name string
1023 if err := parseMessageParams(msg, nil, &name); err != nil {
1024 return err
1025 }
1026
1027 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1028 name := name
1029 if name != "*" {
1030 // TODO: support WHO masks
1031 name = dc.marshalEntity(uc, name)
1032 }
1033 dc.SendMessage(&irc.Message{
1034 Prefix: dc.srv.prefix(),
1035 Command: irc.RPL_ENDOFWHO,
1036 Params: []string{dc.nick, name, "End of /WHO list"},
1037 })
1038 })
1039 case irc.RPL_WHOISUSER:
1040 var nick, username, host, realname string
1041 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
1042 return err
1043 }
1044
1045 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1046 nick := dc.marshalNick(uc, nick)
1047 dc.SendMessage(&irc.Message{
1048 Prefix: dc.srv.prefix(),
1049 Command: irc.RPL_WHOISUSER,
1050 Params: []string{dc.nick, nick, username, host, "*", realname},
1051 })
1052 })
1053 case irc.RPL_WHOISSERVER:
1054 var nick, server, serverInfo string
1055 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
1056 return err
1057 }
1058
1059 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1060 nick := dc.marshalNick(uc, nick)
1061 dc.SendMessage(&irc.Message{
1062 Prefix: dc.srv.prefix(),
1063 Command: irc.RPL_WHOISSERVER,
1064 Params: []string{dc.nick, nick, server, serverInfo},
1065 })
1066 })
1067 case irc.RPL_WHOISOPERATOR:
1068 var nick string
1069 if err := parseMessageParams(msg, nil, &nick); err != nil {
1070 return err
1071 }
1072
1073 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1074 nick := dc.marshalNick(uc, nick)
1075 dc.SendMessage(&irc.Message{
1076 Prefix: dc.srv.prefix(),
1077 Command: irc.RPL_WHOISOPERATOR,
1078 Params: []string{dc.nick, nick, "is an IRC operator"},
1079 })
1080 })
1081 case irc.RPL_WHOISIDLE:
1082 var nick string
1083 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1084 return err
1085 }
1086
1087 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1088 nick := dc.marshalNick(uc, nick)
1089 params := []string{dc.nick, nick}
1090 params = append(params, msg.Params[2:]...)
1091 dc.SendMessage(&irc.Message{
1092 Prefix: dc.srv.prefix(),
1093 Command: irc.RPL_WHOISIDLE,
1094 Params: params,
1095 })
1096 })
1097 case irc.RPL_WHOISCHANNELS:
1098 var nick, channelList string
1099 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1100 return err
1101 }
1102 channels := splitSpace(channelList)
1103
1104 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1105 nick := dc.marshalNick(uc, nick)
1106 channelList := make([]string, len(channels))
1107 for i, channel := range channels {
1108 prefix, channel := uc.parseMembershipPrefix(channel)
1109 channel = dc.marshalChannel(uc, channel)
1110 channelList[i] = prefix.String() + channel
1111 }
1112 channels := strings.Join(channelList, " ")
1113 dc.SendMessage(&irc.Message{
1114 Prefix: dc.srv.prefix(),
1115 Command: irc.RPL_WHOISCHANNELS,
1116 Params: []string{dc.nick, nick, channels},
1117 })
1118 })
1119 case irc.RPL_ENDOFWHOIS:
1120 var nick string
1121 if err := parseMessageParams(msg, nil, &nick); err != nil {
1122 return err
1123 }
1124
1125 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1126 nick := dc.marshalNick(uc, nick)
1127 dc.SendMessage(&irc.Message{
1128 Prefix: dc.srv.prefix(),
1129 Command: irc.RPL_ENDOFWHOIS,
1130 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1131 })
1132 })
1133 case "PRIVMSG":
1134 if msg.Prefix == nil {
1135 return fmt.Errorf("expected a prefix")
1136 }
1137
1138 var nick, text string
1139 if err := parseMessageParams(msg, &nick, &text); err != nil {
1140 return err
1141 }
1142
1143 if msg.Prefix.Name == serviceNick {
1144 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1145 break
1146 }
1147 if nick == serviceNick {
1148 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1149 break
1150 }
1151
1152 if _, ok := msg.Tags["time"]; !ok {
1153 msg.Tags["time"] = irc.TagValue(time.Now().Format(serverTimeLayout))
1154 }
1155
1156 target := nick
1157 if nick == uc.nick {
1158 target = msg.Prefix.Name
1159 }
1160 uc.appendLog(target, msg)
1161
1162 uc.network.ring.Produce(msg)
1163 case "INVITE":
1164 var nick string
1165 var channel string
1166 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1167 return err
1168 }
1169
1170 uc.forEachDownstream(func(dc *downstreamConn) {
1171 dc.SendMessage(&irc.Message{
1172 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
1173 Command: "INVITE",
1174 Params: []string{dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1175 })
1176 })
1177 case irc.RPL_INVITING:
1178 var nick string
1179 var channel string
1180 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1181 return err
1182 }
1183
1184 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1185 dc.SendMessage(&irc.Message{
1186 Prefix: dc.srv.prefix(),
1187 Command: irc.RPL_INVITING,
1188 Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1189 })
1190 })
1191 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1192 var command, reason string
1193 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1194 return err
1195 }
1196
1197 if command == "LIST" {
1198 ok := uc.endPendingLISTs(false)
1199 if !ok {
1200 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1201 }
1202 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1203 dc.SendMessage(&irc.Message{
1204 Prefix: uc.srv.prefix(),
1205 Command: msg.Command,
1206 Params: []string{dc.nick, "LIST", reason},
1207 })
1208 })
1209 }
1210 case "TAGMSG":
1211 // TODO: relay to downstream connections that accept message-tags
1212 case "ACK":
1213 // Ignore
1214 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1215 // Ignore
1216 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1217 // Ignore
1218 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1219 // Ignore
1220 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1221 // Ignore
1222 case irc.RPL_LISTSTART:
1223 // Ignore
1224 case rpl_localusers, rpl_globalusers:
1225 // Ignore
1226 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1227 // Ignore
1228 default:
1229 uc.logger.Printf("unhandled message: %v", msg)
1230 }
1231 return nil
1232}
1233
1234func splitSpace(s string) []string {
1235 return strings.FieldsFunc(s, func(r rune) bool {
1236 return r == ' '
1237 })
1238}
1239
1240func (uc *upstreamConn) register() {
1241 uc.nick = uc.network.Nick
1242 uc.username = uc.network.Username
1243 if uc.username == "" {
1244 uc.username = uc.nick
1245 }
1246 uc.realname = uc.network.Realname
1247 if uc.realname == "" {
1248 uc.realname = uc.nick
1249 }
1250
1251 uc.SendMessage(&irc.Message{
1252 Command: "CAP",
1253 Params: []string{"LS", "302"},
1254 })
1255
1256 if uc.network.Pass != "" {
1257 uc.SendMessage(&irc.Message{
1258 Command: "PASS",
1259 Params: []string{uc.network.Pass},
1260 })
1261 }
1262
1263 uc.SendMessage(&irc.Message{
1264 Command: "NICK",
1265 Params: []string{uc.nick},
1266 })
1267 uc.SendMessage(&irc.Message{
1268 Command: "USER",
1269 Params: []string{uc.username, "0", "*", uc.realname},
1270 })
1271}
1272
1273func (uc *upstreamConn) runUntilRegistered() error {
1274 for !uc.registered {
1275 msg, err := uc.ReadMessage()
1276 if err != nil {
1277 return fmt.Errorf("failed to read message: %v", err)
1278 }
1279
1280 if err := uc.handleMessage(msg); err != nil {
1281 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1282 }
1283 }
1284
1285 return nil
1286}
1287
1288func (uc *upstreamConn) requestSASL() bool {
1289 if uc.network.SASL.Mechanism == "" {
1290 return false
1291 }
1292
1293 v, ok := uc.caps["sasl"]
1294 if !ok {
1295 return false
1296 }
1297 if v != "" {
1298 mechanisms := strings.Split(v, ",")
1299 found := false
1300 for _, mech := range mechanisms {
1301 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1302 found = true
1303 break
1304 }
1305 }
1306 if !found {
1307 return false
1308 }
1309 }
1310
1311 return true
1312}
1313
1314func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1315 auth := &uc.network.SASL
1316 switch name {
1317 case "sasl":
1318 if !ok {
1319 uc.logger.Printf("server refused to acknowledge the SASL capability")
1320 return nil
1321 }
1322
1323 switch auth.Mechanism {
1324 case "PLAIN":
1325 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1326 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1327 default:
1328 return fmt.Errorf("unsupported SASL mechanism %q", name)
1329 }
1330
1331 uc.SendMessage(&irc.Message{
1332 Command: "AUTHENTICATE",
1333 Params: []string{auth.Mechanism},
1334 })
1335 case "message-tags":
1336 uc.tagsSupported = ok
1337 case "labeled-response":
1338 uc.labelsSupported = ok
1339 case "batch", "server-time":
1340 // Nothing to do
1341 default:
1342 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
1343 }
1344 return nil
1345}
1346
1347func (uc *upstreamConn) readMessages(ch chan<- event) error {
1348 for {
1349 msg, err := uc.ReadMessage()
1350 if err == io.EOF {
1351 break
1352 } else if err != nil {
1353 return fmt.Errorf("failed to read IRC command: %v", err)
1354 }
1355
1356 ch <- eventUpstreamMessage{msg, uc}
1357 }
1358
1359 return nil
1360}
1361
1362func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1363 if uc.labelsSupported {
1364 if msg.Tags == nil {
1365 msg.Tags = make(map[string]irc.TagValue)
1366 }
1367 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1368 uc.nextLabelID++
1369 }
1370 uc.SendMessage(msg)
1371}
1372
1373// TODO: handle moving logs when a network name changes, when support for this is added
1374func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
1375 if uc.srv.LogPath == "" {
1376 return
1377 }
1378
1379 ml, ok := uc.messageLoggers[entity]
1380 if !ok {
1381 ml = newMessageLogger(uc, entity)
1382 uc.messageLoggers[entity] = ml
1383 }
1384
1385 if err := ml.Append(msg); err != nil {
1386 uc.logger.Printf("failed to log message: %v", err)
1387 }
1388}
1389
1390func (uc *upstreamConn) updateAway() {
1391 away := true
1392 uc.forEachDownstream(func(*downstreamConn) {
1393 away = false
1394 })
1395 if away == uc.away {
1396 return
1397 }
1398 if away {
1399 uc.SendMessage(&irc.Message{
1400 Command: "AWAY",
1401 Params: []string{"Auto away"},
1402 })
1403 } else {
1404 uc.SendMessage(&irc.Message{
1405 Command: "AWAY",
1406 })
1407 }
1408 uc.away = away
1409}
Note: See TracBrowser for help on using the repository browser.