source: code/trunk/upstream.go@ 270

Last change on this file since 270 was 270, checked in by delthas, 5 years ago

Add support for the irc+insecure address scheme

Some servers do not support TLS, or have invalid, expired or self-signed
TLS certificates. While the right fix would be toi contact each server
owner to add support for valid TLS, supporting plaintext upstream
connections is sometimes necessary.

This adds support for the irc+insecure address scheme, which connects to
a network in plain-text over TCP.

File size: 34.1 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "gopkg.in/irc.v3"
16)
17
18type upstreamChannel struct {
19 Name string
20 conn *upstreamConn
21 Topic string
22 TopicWho string
23 TopicTime time.Time
24 Status channelStatus
25 modes channelModes
26 creationTime string
27 Members map[string]*membership
28 complete bool
29}
30
31type upstreamConn struct {
32 conn
33
34 network *network
35 user *user
36
37 serverName string
38 availableUserModes string
39 availableChannelModes map[byte]channelModeType
40 availableChannelTypes string
41 availableMemberships []membership
42
43 registered bool
44 nick string
45 username string
46 realname string
47 modes userModes
48 channels map[string]*upstreamChannel
49 caps map[string]string
50 batches map[string]batch
51 away bool
52
53 tagsSupported bool
54 labelsSupported bool
55 nextLabelID uint64
56
57 saslClient sasl.Client
58 saslStarted bool
59
60 // set of LIST commands in progress, per downstream
61 pendingLISTDownstreamSet map[uint64]struct{}
62
63 messageLoggers map[string]*messageLogger
64}
65
66func connectToUpstream(network *network) (*upstreamConn, error) {
67 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
68
69 var scheme string
70 var addr string
71
72 addrParts := strings.SplitN(network.Addr, "://", 2)
73 if len(addrParts) == 2 {
74 scheme = addrParts[0]
75 addr = addrParts[1]
76 } else {
77 scheme = "ircs"
78 addr = addrParts[0]
79 }
80
81 dialer := net.Dialer{Timeout: connectTimeout}
82
83 var netConn net.Conn
84 var err error
85 switch scheme {
86 case "ircs":
87 if !strings.ContainsRune(addr, ':') {
88 addr = addr + ":6697"
89 }
90
91 logger.Printf("connecting to TLS server at address %q", addr)
92 netConn, err = tls.DialWithDialer(&dialer, "tcp", addr, nil)
93 case "irc+insecure":
94 if !strings.ContainsRune(addr, ':') {
95 addr = addr + ":6667"
96 }
97
98 logger.Printf("connecting to plain-text server at address %q", addr)
99 netConn, err = dialer.Dial("tcp", addr)
100 default:
101 return nil, fmt.Errorf("failed to dial %q: unknown scheme: %v", addr, scheme)
102 }
103 if err != nil {
104 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
105 }
106
107 uc := &upstreamConn{
108 conn: *newConn(network.user.srv, netConn, logger),
109 network: network,
110 user: network.user,
111 channels: make(map[string]*upstreamChannel),
112 caps: make(map[string]string),
113 batches: make(map[string]batch),
114 availableChannelTypes: stdChannelTypes,
115 availableChannelModes: stdChannelModes,
116 availableMemberships: stdMemberships,
117 pendingLISTDownstreamSet: make(map[uint64]struct{}),
118 messageLoggers: make(map[string]*messageLogger),
119 }
120
121 return uc, nil
122}
123
124func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
125 uc.network.forEachDownstream(f)
126}
127
128func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
129 uc.forEachDownstream(func(dc *downstreamConn) {
130 if id != 0 && id != dc.id {
131 return
132 }
133 f(dc)
134 })
135}
136
137func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
138 ch, ok := uc.channels[name]
139 if !ok {
140 return nil, fmt.Errorf("unknown channel %q", name)
141 }
142 return ch, nil
143}
144
145func (uc *upstreamConn) isChannel(entity string) bool {
146 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
147 return true
148 }
149 return false
150}
151
152func (uc *upstreamConn) getPendingLIST() *pendingLIST {
153 for _, pl := range uc.user.pendingLISTs {
154 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
155 continue
156 }
157 return &pl
158 }
159 return nil
160}
161
162func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
163 found = false
164 for i := 0; i < len(uc.user.pendingLISTs); i++ {
165 pl := uc.user.pendingLISTs[i]
166 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
167 continue
168 }
169 delete(pl.pendingCommands, uc.network.ID)
170 if len(pl.pendingCommands) == 0 {
171 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
172 i--
173 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
174 dc.SendMessage(&irc.Message{
175 Prefix: dc.srv.prefix(),
176 Command: irc.RPL_LISTEND,
177 Params: []string{dc.nick, "End of /LIST"},
178 })
179 })
180 }
181 found = true
182 if !all {
183 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
184 uc.user.forEachUpstream(func(uc *upstreamConn) {
185 uc.trySendLIST(pl.downstreamID)
186 })
187 return
188 }
189 }
190 return
191}
192
193func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
194 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
195 // a LIST command is already pending
196 // we will try again when that command is completed
197 return
198 }
199
200 for _, pl := range uc.user.pendingLISTs {
201 if pl.downstreamID != downstreamID {
202 continue
203 }
204 // this is the first pending LIST command list of the downstream
205 listCommand, ok := pl.pendingCommands[uc.network.ID]
206 if !ok {
207 // there is no command for this upstream in these LIST commands
208 // do not send anything
209 continue
210 }
211 // there is a command for this upstream in these LIST commands
212 // send it now
213
214 uc.SendMessageLabeled(downstreamID, listCommand)
215
216 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
217 return
218 }
219}
220
221func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
222 for _, m := range uc.availableMemberships {
223 if m.Prefix == s[0] {
224 return &m, s[1:]
225 }
226 }
227 return nil, s
228}
229
230func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
231 var label string
232 if l, ok := msg.GetTag("label"); ok {
233 label = l
234 }
235
236 var msgBatch *batch
237 if batchName, ok := msg.GetTag("batch"); ok {
238 b, ok := uc.batches[batchName]
239 if !ok {
240 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
241 }
242 msgBatch = &b
243 if label == "" {
244 label = msgBatch.Label
245 }
246 }
247
248 var downstreamID uint64 = 0
249 if label != "" {
250 var labelOffset uint64
251 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
252 if err == nil && n < 2 {
253 err = errors.New("not enough arguments")
254 }
255 if err != nil {
256 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
257 }
258 }
259
260 if _, ok := msg.Tags["time"]; !ok {
261 msg.Tags["time"] = irc.TagValue(time.Now().UTC().Format(serverTimeLayout))
262 }
263
264 switch msg.Command {
265 case "PING":
266 uc.SendMessage(&irc.Message{
267 Command: "PONG",
268 Params: msg.Params,
269 })
270 return nil
271 case "NOTICE":
272 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
273 uc.produce("", msg, nil)
274 } else { // regular user NOTICE
275 var entity, text string
276 if err := parseMessageParams(msg, &entity, &text); err != nil {
277 return err
278 }
279
280 target := entity
281 if target == uc.nick {
282 target = msg.Prefix.Name
283 }
284 uc.produce(target, msg, nil)
285 }
286 case "CAP":
287 var subCmd string
288 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
289 return err
290 }
291 subCmd = strings.ToUpper(subCmd)
292 subParams := msg.Params[2:]
293 switch subCmd {
294 case "LS":
295 if len(subParams) < 1 {
296 return newNeedMoreParamsError(msg.Command)
297 }
298 caps := strings.Fields(subParams[len(subParams)-1])
299 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
300
301 for _, s := range caps {
302 kv := strings.SplitN(s, "=", 2)
303 k := strings.ToLower(kv[0])
304 var v string
305 if len(kv) == 2 {
306 v = kv[1]
307 }
308 uc.caps[k] = v
309 }
310
311 if more {
312 break // wait to receive all capabilities
313 }
314
315 requestCaps := make([]string, 0, 16)
316 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time"} {
317 if _, ok := uc.caps[c]; ok {
318 requestCaps = append(requestCaps, c)
319 }
320 }
321
322 if uc.requestSASL() {
323 requestCaps = append(requestCaps, "sasl")
324 }
325
326 if len(requestCaps) > 0 {
327 uc.SendMessage(&irc.Message{
328 Command: "CAP",
329 Params: []string{"REQ", strings.Join(requestCaps, " ")},
330 })
331 }
332
333 if uc.requestSASL() {
334 break // we'll send CAP END after authentication is completed
335 }
336
337 uc.SendMessage(&irc.Message{
338 Command: "CAP",
339 Params: []string{"END"},
340 })
341 case "ACK", "NAK":
342 if len(subParams) < 1 {
343 return newNeedMoreParamsError(msg.Command)
344 }
345 caps := strings.Fields(subParams[0])
346
347 for _, name := range caps {
348 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
349 return err
350 }
351 }
352
353 if uc.saslClient == nil {
354 uc.SendMessage(&irc.Message{
355 Command: "CAP",
356 Params: []string{"END"},
357 })
358 }
359 default:
360 uc.logger.Printf("unhandled message: %v", msg)
361 }
362 case "AUTHENTICATE":
363 if uc.saslClient == nil {
364 return fmt.Errorf("received unexpected AUTHENTICATE message")
365 }
366
367 // TODO: if a challenge is 400 bytes long, buffer it
368 var challengeStr string
369 if err := parseMessageParams(msg, &challengeStr); err != nil {
370 uc.SendMessage(&irc.Message{
371 Command: "AUTHENTICATE",
372 Params: []string{"*"},
373 })
374 return err
375 }
376
377 var challenge []byte
378 if challengeStr != "+" {
379 var err error
380 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
381 if err != nil {
382 uc.SendMessage(&irc.Message{
383 Command: "AUTHENTICATE",
384 Params: []string{"*"},
385 })
386 return err
387 }
388 }
389
390 var resp []byte
391 var err error
392 if !uc.saslStarted {
393 _, resp, err = uc.saslClient.Start()
394 uc.saslStarted = true
395 } else {
396 resp, err = uc.saslClient.Next(challenge)
397 }
398 if err != nil {
399 uc.SendMessage(&irc.Message{
400 Command: "AUTHENTICATE",
401 Params: []string{"*"},
402 })
403 return err
404 }
405
406 // TODO: send response in multiple chunks if >= 400 bytes
407 var respStr = "+"
408 if resp != nil {
409 respStr = base64.StdEncoding.EncodeToString(resp)
410 }
411
412 uc.SendMessage(&irc.Message{
413 Command: "AUTHENTICATE",
414 Params: []string{respStr},
415 })
416 case irc.RPL_LOGGEDIN:
417 var account string
418 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
419 return err
420 }
421 uc.logger.Printf("logged in with account %q", account)
422 case irc.RPL_LOGGEDOUT:
423 uc.logger.Printf("logged out")
424 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
425 var info string
426 if err := parseMessageParams(msg, nil, &info); err != nil {
427 return err
428 }
429 switch msg.Command {
430 case irc.ERR_NICKLOCKED:
431 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
432 case irc.ERR_SASLFAIL:
433 uc.logger.Printf("SASL authentication failed: %v", info)
434 case irc.ERR_SASLTOOLONG:
435 uc.logger.Printf("SASL message too long: %v", info)
436 }
437
438 uc.saslClient = nil
439 uc.saslStarted = false
440
441 uc.SendMessage(&irc.Message{
442 Command: "CAP",
443 Params: []string{"END"},
444 })
445 case irc.RPL_WELCOME:
446 uc.registered = true
447 uc.logger.Printf("connection registered")
448
449 for _, ch := range uc.network.channels {
450 params := []string{ch.Name}
451 if ch.Key != "" {
452 params = append(params, ch.Key)
453 }
454 uc.SendMessage(&irc.Message{
455 Command: "JOIN",
456 Params: params,
457 })
458 }
459 case irc.RPL_MYINFO:
460 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
461 return err
462 }
463 case irc.RPL_ISUPPORT:
464 if err := parseMessageParams(msg, nil, nil); err != nil {
465 return err
466 }
467 for _, token := range msg.Params[1 : len(msg.Params)-1] {
468 negate := false
469 parameter := token
470 value := ""
471 if strings.HasPrefix(token, "-") {
472 negate = true
473 token = token[1:]
474 } else {
475 if i := strings.IndexByte(token, '='); i >= 0 {
476 parameter = token[:i]
477 value = token[i+1:]
478 }
479 }
480 if !negate {
481 switch parameter {
482 case "CHANMODES":
483 parts := strings.SplitN(value, ",", 5)
484 if len(parts) < 4 {
485 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
486 }
487 modes := make(map[byte]channelModeType)
488 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
489 for j := 0; j < len(parts[i]); j++ {
490 mode := parts[i][j]
491 modes[mode] = mt
492 }
493 }
494 uc.availableChannelModes = modes
495 case "CHANTYPES":
496 uc.availableChannelTypes = value
497 case "PREFIX":
498 if value == "" {
499 uc.availableMemberships = nil
500 } else {
501 if value[0] != '(' {
502 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
503 }
504 sep := strings.IndexByte(value, ')')
505 if sep < 0 || len(value) != sep*2 {
506 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
507 }
508 memberships := make([]membership, len(value)/2-1)
509 for i := range memberships {
510 memberships[i] = membership{
511 Mode: value[i+1],
512 Prefix: value[sep+i+1],
513 }
514 }
515 uc.availableMemberships = memberships
516 }
517 }
518 } else {
519 // TODO: handle ISUPPORT negations
520 }
521 }
522 case "BATCH":
523 var tag string
524 if err := parseMessageParams(msg, &tag); err != nil {
525 return err
526 }
527
528 if strings.HasPrefix(tag, "+") {
529 tag = tag[1:]
530 if _, ok := uc.batches[tag]; ok {
531 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
532 }
533 var batchType string
534 if err := parseMessageParams(msg, nil, &batchType); err != nil {
535 return err
536 }
537 label := label
538 if label == "" && msgBatch != nil {
539 label = msgBatch.Label
540 }
541 uc.batches[tag] = batch{
542 Type: batchType,
543 Params: msg.Params[2:],
544 Outer: msgBatch,
545 Label: label,
546 }
547 } else if strings.HasPrefix(tag, "-") {
548 tag = tag[1:]
549 if _, ok := uc.batches[tag]; !ok {
550 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
551 }
552 delete(uc.batches, tag)
553 } else {
554 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
555 }
556 case "NICK":
557 if msg.Prefix == nil {
558 return fmt.Errorf("expected a prefix")
559 }
560
561 var newNick string
562 if err := parseMessageParams(msg, &newNick); err != nil {
563 return err
564 }
565
566 me := false
567 if msg.Prefix.Name == uc.nick {
568 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
569 me = true
570 uc.nick = newNick
571 }
572
573 for _, ch := range uc.channels {
574 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
575 delete(ch.Members, msg.Prefix.Name)
576 ch.Members[newNick] = membership
577 uc.appendLog(ch.Name, msg)
578 uc.appendHistory(ch.Name, msg)
579 }
580 }
581
582 if !me {
583 uc.forEachDownstream(func(dc *downstreamConn) {
584 dc.SendMessage(dc.marshalMessage(msg, uc.network))
585 })
586 }
587 case "JOIN":
588 if msg.Prefix == nil {
589 return fmt.Errorf("expected a prefix")
590 }
591
592 var channels string
593 if err := parseMessageParams(msg, &channels); err != nil {
594 return err
595 }
596
597 for _, ch := range strings.Split(channels, ",") {
598 if msg.Prefix.Name == uc.nick {
599 uc.logger.Printf("joined channel %q", ch)
600 uc.channels[ch] = &upstreamChannel{
601 Name: ch,
602 conn: uc,
603 Members: make(map[string]*membership),
604 }
605
606 uc.SendMessage(&irc.Message{
607 Command: "MODE",
608 Params: []string{ch},
609 })
610 } else {
611 ch, err := uc.getChannel(ch)
612 if err != nil {
613 return err
614 }
615 ch.Members[msg.Prefix.Name] = nil
616 }
617
618 chMsg := msg.Copy()
619 chMsg.Params[0] = ch
620 uc.produce(ch, chMsg, nil)
621 }
622 case "PART":
623 if msg.Prefix == nil {
624 return fmt.Errorf("expected a prefix")
625 }
626
627 var channels string
628 if err := parseMessageParams(msg, &channels); err != nil {
629 return err
630 }
631
632 for _, ch := range strings.Split(channels, ",") {
633 if msg.Prefix.Name == uc.nick {
634 uc.logger.Printf("parted channel %q", ch)
635 delete(uc.channels, ch)
636 } else {
637 ch, err := uc.getChannel(ch)
638 if err != nil {
639 return err
640 }
641 delete(ch.Members, msg.Prefix.Name)
642 }
643
644 chMsg := msg.Copy()
645 chMsg.Params[0] = ch
646 uc.produce(ch, chMsg, nil)
647 }
648 case "KICK":
649 if msg.Prefix == nil {
650 return fmt.Errorf("expected a prefix")
651 }
652
653 var channel, user string
654 if err := parseMessageParams(msg, &channel, &user); err != nil {
655 return err
656 }
657
658 if user == uc.nick {
659 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
660 delete(uc.channels, channel)
661 } else {
662 ch, err := uc.getChannel(channel)
663 if err != nil {
664 return err
665 }
666 delete(ch.Members, user)
667 }
668
669 uc.produce(channel, msg, nil)
670 case "QUIT":
671 if msg.Prefix == nil {
672 return fmt.Errorf("expected a prefix")
673 }
674
675 if msg.Prefix.Name == uc.nick {
676 uc.logger.Printf("quit")
677 }
678
679 for _, ch := range uc.channels {
680 if _, ok := ch.Members[msg.Prefix.Name]; ok {
681 delete(ch.Members, msg.Prefix.Name)
682
683 uc.appendLog(ch.Name, msg)
684 uc.appendHistory(ch.Name, msg)
685 }
686 }
687
688 if msg.Prefix.Name != uc.nick {
689 uc.forEachDownstream(func(dc *downstreamConn) {
690 dc.SendMessage(dc.marshalMessage(msg, uc.network))
691 })
692 }
693 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
694 var name, topic string
695 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
696 return err
697 }
698 ch, err := uc.getChannel(name)
699 if err != nil {
700 return err
701 }
702 if msg.Command == irc.RPL_TOPIC {
703 ch.Topic = topic
704 } else {
705 ch.Topic = ""
706 }
707 case "TOPIC":
708 var name string
709 if err := parseMessageParams(msg, &name); err != nil {
710 return err
711 }
712 ch, err := uc.getChannel(name)
713 if err != nil {
714 return err
715 }
716 if len(msg.Params) > 1 {
717 ch.Topic = msg.Params[1]
718 } else {
719 ch.Topic = ""
720 }
721 uc.produce(ch.Name, msg, nil)
722 case "MODE":
723 var name, modeStr string
724 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
725 return err
726 }
727
728 if !uc.isChannel(name) { // user mode change
729 if name != uc.nick {
730 return fmt.Errorf("received MODE message for unknown nick %q", name)
731 }
732 return uc.modes.Apply(modeStr)
733 // TODO: notify downstreams about user mode change?
734 } else { // channel mode change
735 ch, err := uc.getChannel(name)
736 if err != nil {
737 return err
738 }
739
740 if ch.modes != nil {
741 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
742 return err
743 }
744 }
745
746 uc.produce(ch.Name, msg, nil)
747 }
748 case irc.RPL_UMODEIS:
749 if err := parseMessageParams(msg, nil); err != nil {
750 return err
751 }
752 modeStr := ""
753 if len(msg.Params) > 1 {
754 modeStr = msg.Params[1]
755 }
756
757 uc.modes = ""
758 if err := uc.modes.Apply(modeStr); err != nil {
759 return err
760 }
761 // TODO: send RPL_UMODEIS to downstream connections when applicable
762 case irc.RPL_CHANNELMODEIS:
763 var channel string
764 if err := parseMessageParams(msg, nil, &channel); err != nil {
765 return err
766 }
767 modeStr := ""
768 if len(msg.Params) > 2 {
769 modeStr = msg.Params[2]
770 }
771
772 ch, err := uc.getChannel(channel)
773 if err != nil {
774 return err
775 }
776
777 firstMode := ch.modes == nil
778 ch.modes = make(map[byte]string)
779 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
780 return err
781 }
782 if firstMode {
783 modeStr, modeParams := ch.modes.Format()
784
785 uc.forEachDownstream(func(dc *downstreamConn) {
786 params := []string{dc.nick, dc.marshalEntity(uc.network, channel), modeStr}
787 params = append(params, modeParams...)
788
789 dc.SendMessage(&irc.Message{
790 Prefix: dc.srv.prefix(),
791 Command: irc.RPL_CHANNELMODEIS,
792 Params: params,
793 })
794 })
795 }
796 case rpl_creationtime:
797 var channel, creationTime string
798 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
799 return err
800 }
801
802 ch, err := uc.getChannel(channel)
803 if err != nil {
804 return err
805 }
806
807 firstCreationTime := ch.creationTime == ""
808 ch.creationTime = creationTime
809 if firstCreationTime {
810 uc.forEachDownstream(func(dc *downstreamConn) {
811 dc.SendMessage(&irc.Message{
812 Prefix: dc.srv.prefix(),
813 Command: rpl_creationtime,
814 Params: []string{dc.nick, channel, creationTime},
815 })
816 })
817 }
818 case rpl_topicwhotime:
819 var name, who, timeStr string
820 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
821 return err
822 }
823 ch, err := uc.getChannel(name)
824 if err != nil {
825 return err
826 }
827 ch.TopicWho = who
828 sec, err := strconv.ParseInt(timeStr, 10, 64)
829 if err != nil {
830 return fmt.Errorf("failed to parse topic time: %v", err)
831 }
832 ch.TopicTime = time.Unix(sec, 0)
833 case irc.RPL_LIST:
834 var channel, clients, topic string
835 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
836 return err
837 }
838
839 pl := uc.getPendingLIST()
840 if pl == nil {
841 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
842 }
843
844 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
845 dc.SendMessage(&irc.Message{
846 Prefix: dc.srv.prefix(),
847 Command: irc.RPL_LIST,
848 Params: []string{dc.nick, dc.marshalEntity(uc.network, channel), clients, topic},
849 })
850 })
851 case irc.RPL_LISTEND:
852 ok := uc.endPendingLISTs(false)
853 if !ok {
854 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
855 }
856 case irc.RPL_NAMREPLY:
857 var name, statusStr, members string
858 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
859 return err
860 }
861
862 ch, ok := uc.channels[name]
863 if !ok {
864 // NAMES on a channel we have not joined, forward to downstream
865 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
866 channel := dc.marshalEntity(uc.network, name)
867 members := splitSpace(members)
868 for i, member := range members {
869 membership, nick := uc.parseMembershipPrefix(member)
870 members[i] = membership.String() + dc.marshalEntity(uc.network, nick)
871 }
872 memberStr := strings.Join(members, " ")
873
874 dc.SendMessage(&irc.Message{
875 Prefix: dc.srv.prefix(),
876 Command: irc.RPL_NAMREPLY,
877 Params: []string{dc.nick, statusStr, channel, memberStr},
878 })
879 })
880 return nil
881 }
882
883 status, err := parseChannelStatus(statusStr)
884 if err != nil {
885 return err
886 }
887 ch.Status = status
888
889 for _, s := range splitSpace(members) {
890 membership, nick := uc.parseMembershipPrefix(s)
891 ch.Members[nick] = membership
892 }
893 case irc.RPL_ENDOFNAMES:
894 var name string
895 if err := parseMessageParams(msg, nil, &name); err != nil {
896 return err
897 }
898
899 ch, ok := uc.channels[name]
900 if !ok {
901 // NAMES on a channel we have not joined, forward to downstream
902 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
903 channel := dc.marshalEntity(uc.network, name)
904
905 dc.SendMessage(&irc.Message{
906 Prefix: dc.srv.prefix(),
907 Command: irc.RPL_ENDOFNAMES,
908 Params: []string{dc.nick, channel, "End of /NAMES list"},
909 })
910 })
911 return nil
912 }
913
914 if ch.complete {
915 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
916 }
917 ch.complete = true
918
919 uc.forEachDownstream(func(dc *downstreamConn) {
920 forwardChannel(dc, ch)
921 })
922 case irc.RPL_WHOREPLY:
923 var channel, username, host, server, nick, mode, trailing string
924 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
925 return err
926 }
927
928 parts := strings.SplitN(trailing, " ", 2)
929 if len(parts) != 2 {
930 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
931 }
932 realname := parts[1]
933 hops, err := strconv.Atoi(parts[0])
934 if err != nil {
935 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
936 }
937 hops++
938
939 trailing = strconv.Itoa(hops) + " " + realname
940
941 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
942 channel := channel
943 if channel != "*" {
944 channel = dc.marshalEntity(uc.network, channel)
945 }
946 nick := dc.marshalEntity(uc.network, nick)
947 dc.SendMessage(&irc.Message{
948 Prefix: dc.srv.prefix(),
949 Command: irc.RPL_WHOREPLY,
950 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
951 })
952 })
953 case irc.RPL_ENDOFWHO:
954 var name string
955 if err := parseMessageParams(msg, nil, &name); err != nil {
956 return err
957 }
958
959 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
960 name := name
961 if name != "*" {
962 // TODO: support WHO masks
963 name = dc.marshalEntity(uc.network, name)
964 }
965 dc.SendMessage(&irc.Message{
966 Prefix: dc.srv.prefix(),
967 Command: irc.RPL_ENDOFWHO,
968 Params: []string{dc.nick, name, "End of /WHO list"},
969 })
970 })
971 case irc.RPL_WHOISUSER:
972 var nick, username, host, realname string
973 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
974 return err
975 }
976
977 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
978 nick := dc.marshalEntity(uc.network, nick)
979 dc.SendMessage(&irc.Message{
980 Prefix: dc.srv.prefix(),
981 Command: irc.RPL_WHOISUSER,
982 Params: []string{dc.nick, nick, username, host, "*", realname},
983 })
984 })
985 case irc.RPL_WHOISSERVER:
986 var nick, server, serverInfo string
987 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
988 return err
989 }
990
991 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
992 nick := dc.marshalEntity(uc.network, nick)
993 dc.SendMessage(&irc.Message{
994 Prefix: dc.srv.prefix(),
995 Command: irc.RPL_WHOISSERVER,
996 Params: []string{dc.nick, nick, server, serverInfo},
997 })
998 })
999 case irc.RPL_WHOISOPERATOR:
1000 var nick string
1001 if err := parseMessageParams(msg, nil, &nick); err != nil {
1002 return err
1003 }
1004
1005 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1006 nick := dc.marshalEntity(uc.network, nick)
1007 dc.SendMessage(&irc.Message{
1008 Prefix: dc.srv.prefix(),
1009 Command: irc.RPL_WHOISOPERATOR,
1010 Params: []string{dc.nick, nick, "is an IRC operator"},
1011 })
1012 })
1013 case irc.RPL_WHOISIDLE:
1014 var nick string
1015 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1016 return err
1017 }
1018
1019 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1020 nick := dc.marshalEntity(uc.network, nick)
1021 params := []string{dc.nick, nick}
1022 params = append(params, msg.Params[2:]...)
1023 dc.SendMessage(&irc.Message{
1024 Prefix: dc.srv.prefix(),
1025 Command: irc.RPL_WHOISIDLE,
1026 Params: params,
1027 })
1028 })
1029 case irc.RPL_WHOISCHANNELS:
1030 var nick, channelList string
1031 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1032 return err
1033 }
1034 channels := splitSpace(channelList)
1035
1036 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1037 nick := dc.marshalEntity(uc.network, nick)
1038 channelList := make([]string, len(channels))
1039 for i, channel := range channels {
1040 prefix, channel := uc.parseMembershipPrefix(channel)
1041 channel = dc.marshalEntity(uc.network, channel)
1042 channelList[i] = prefix.String() + channel
1043 }
1044 channels := strings.Join(channelList, " ")
1045 dc.SendMessage(&irc.Message{
1046 Prefix: dc.srv.prefix(),
1047 Command: irc.RPL_WHOISCHANNELS,
1048 Params: []string{dc.nick, nick, channels},
1049 })
1050 })
1051 case irc.RPL_ENDOFWHOIS:
1052 var nick string
1053 if err := parseMessageParams(msg, nil, &nick); err != nil {
1054 return err
1055 }
1056
1057 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1058 nick := dc.marshalEntity(uc.network, nick)
1059 dc.SendMessage(&irc.Message{
1060 Prefix: dc.srv.prefix(),
1061 Command: irc.RPL_ENDOFWHOIS,
1062 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1063 })
1064 })
1065 case "PRIVMSG":
1066 if msg.Prefix == nil {
1067 return fmt.Errorf("expected a prefix")
1068 }
1069
1070 var entity, text string
1071 if err := parseMessageParams(msg, &entity, &text); err != nil {
1072 return err
1073 }
1074
1075 if msg.Prefix.Name == serviceNick {
1076 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1077 break
1078 }
1079 if entity == serviceNick {
1080 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1081 break
1082 }
1083
1084 target := entity
1085 if target == uc.nick {
1086 target = msg.Prefix.Name
1087 }
1088 uc.produce(target, msg, nil)
1089 case "INVITE":
1090 var nick string
1091 var channel string
1092 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1093 return err
1094 }
1095
1096 uc.forEachDownstream(func(dc *downstreamConn) {
1097 dc.SendMessage(&irc.Message{
1098 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1099 Command: "INVITE",
1100 Params: []string{dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1101 })
1102 })
1103 case irc.RPL_INVITING:
1104 var nick string
1105 var channel string
1106 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1107 return err
1108 }
1109
1110 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1111 dc.SendMessage(&irc.Message{
1112 Prefix: dc.srv.prefix(),
1113 Command: irc.RPL_INVITING,
1114 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1115 })
1116 })
1117 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1118 var command, reason string
1119 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1120 return err
1121 }
1122
1123 if command == "LIST" {
1124 ok := uc.endPendingLISTs(false)
1125 if !ok {
1126 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1127 }
1128 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1129 dc.SendMessage(&irc.Message{
1130 Prefix: uc.srv.prefix(),
1131 Command: msg.Command,
1132 Params: []string{dc.nick, "LIST", reason},
1133 })
1134 })
1135 }
1136 case "TAGMSG":
1137 // TODO: relay to downstream connections that accept message-tags
1138 case "ACK":
1139 // Ignore
1140 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1141 // Ignore
1142 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1143 // Ignore
1144 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1145 // Ignore
1146 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1147 // Ignore
1148 case irc.RPL_LISTSTART:
1149 // Ignore
1150 case rpl_localusers, rpl_globalusers:
1151 // Ignore
1152 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1153 // Ignore
1154 default:
1155 uc.logger.Printf("unhandled message: %v", msg)
1156 }
1157 return nil
1158}
1159
1160func splitSpace(s string) []string {
1161 return strings.FieldsFunc(s, func(r rune) bool {
1162 return r == ' '
1163 })
1164}
1165
1166func (uc *upstreamConn) register() {
1167 uc.nick = uc.network.Nick
1168 uc.username = uc.network.Username
1169 if uc.username == "" {
1170 uc.username = uc.nick
1171 }
1172 uc.realname = uc.network.Realname
1173 if uc.realname == "" {
1174 uc.realname = uc.nick
1175 }
1176
1177 uc.SendMessage(&irc.Message{
1178 Command: "CAP",
1179 Params: []string{"LS", "302"},
1180 })
1181
1182 if uc.network.Pass != "" {
1183 uc.SendMessage(&irc.Message{
1184 Command: "PASS",
1185 Params: []string{uc.network.Pass},
1186 })
1187 }
1188
1189 uc.SendMessage(&irc.Message{
1190 Command: "NICK",
1191 Params: []string{uc.nick},
1192 })
1193 uc.SendMessage(&irc.Message{
1194 Command: "USER",
1195 Params: []string{uc.username, "0", "*", uc.realname},
1196 })
1197}
1198
1199func (uc *upstreamConn) runUntilRegistered() error {
1200 for !uc.registered {
1201 msg, err := uc.ReadMessage()
1202 if err != nil {
1203 return fmt.Errorf("failed to read message: %v", err)
1204 }
1205
1206 if err := uc.handleMessage(msg); err != nil {
1207 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1208 }
1209 }
1210
1211 for _, command := range uc.network.ConnectCommands {
1212 m, err := irc.ParseMessage(command)
1213 if err != nil {
1214 uc.logger.Printf("failed to parse connect command %q: %v", command, err)
1215 } else {
1216 uc.SendMessage(m)
1217 }
1218 }
1219
1220 return nil
1221}
1222
1223func (uc *upstreamConn) requestSASL() bool {
1224 if uc.network.SASL.Mechanism == "" {
1225 return false
1226 }
1227
1228 v, ok := uc.caps["sasl"]
1229 if !ok {
1230 return false
1231 }
1232 if v != "" {
1233 mechanisms := strings.Split(v, ",")
1234 found := false
1235 for _, mech := range mechanisms {
1236 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1237 found = true
1238 break
1239 }
1240 }
1241 if !found {
1242 return false
1243 }
1244 }
1245
1246 return true
1247}
1248
1249func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1250 auth := &uc.network.SASL
1251 switch name {
1252 case "sasl":
1253 if !ok {
1254 uc.logger.Printf("server refused to acknowledge the SASL capability")
1255 return nil
1256 }
1257
1258 switch auth.Mechanism {
1259 case "PLAIN":
1260 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1261 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1262 default:
1263 return fmt.Errorf("unsupported SASL mechanism %q", name)
1264 }
1265
1266 uc.SendMessage(&irc.Message{
1267 Command: "AUTHENTICATE",
1268 Params: []string{auth.Mechanism},
1269 })
1270 case "message-tags":
1271 uc.tagsSupported = ok
1272 case "labeled-response":
1273 uc.labelsSupported = ok
1274 case "batch", "server-time":
1275 // Nothing to do
1276 default:
1277 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
1278 }
1279 return nil
1280}
1281
1282func (uc *upstreamConn) readMessages(ch chan<- event) error {
1283 for {
1284 msg, err := uc.ReadMessage()
1285 if err == io.EOF {
1286 break
1287 } else if err != nil {
1288 return fmt.Errorf("failed to read IRC command: %v", err)
1289 }
1290
1291 ch <- eventUpstreamMessage{msg, uc}
1292 }
1293
1294 return nil
1295}
1296
1297func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1298 if uc.labelsSupported {
1299 if msg.Tags == nil {
1300 msg.Tags = make(map[string]irc.TagValue)
1301 }
1302 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1303 uc.nextLabelID++
1304 }
1305 uc.SendMessage(msg)
1306}
1307
1308// TODO: handle moving logs when a network name changes, when support for this is added
1309func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
1310 if uc.srv.LogPath == "" {
1311 return
1312 }
1313
1314 ml, ok := uc.messageLoggers[entity]
1315 if !ok {
1316 ml = newMessageLogger(uc.network, entity)
1317 uc.messageLoggers[entity] = ml
1318 }
1319
1320 if err := ml.Append(msg); err != nil {
1321 uc.logger.Printf("failed to log message: %v", err)
1322 }
1323}
1324
1325// appendHistory appends a message to the history. entity can be empty.
1326func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
1327 // If no client is offline, no need to append the message to the buffer
1328 if len(uc.network.offlineClients) == 0 {
1329 return
1330 }
1331
1332 history, ok := uc.network.history[entity]
1333 if !ok {
1334 history = &networkHistory{
1335 offlineClients: make(map[string]uint64),
1336 ring: NewRing(uc.srv.RingCap),
1337 }
1338 uc.network.history[entity] = history
1339
1340 for clientName, _ := range uc.network.offlineClients {
1341 history.offlineClients[clientName] = 0
1342 }
1343 }
1344
1345 history.ring.Produce(msg)
1346}
1347
1348// produce appends a message to the logs, adds it to the history and forwards
1349// it to connected downstream connections.
1350//
1351// If origin is not nil and origin doesn't support echo-message, the message is
1352// forwarded to all connections except origin.
1353func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) {
1354 if target != "" {
1355 uc.appendLog(target, msg)
1356 }
1357
1358 uc.appendHistory(target, msg)
1359
1360 uc.forEachDownstream(func(dc *downstreamConn) {
1361 if dc != origin || dc.caps["echo-message"] {
1362 dc.SendMessage(dc.marshalMessage(msg, uc.network))
1363 }
1364 })
1365}
1366
1367func (uc *upstreamConn) updateAway() {
1368 away := true
1369 uc.forEachDownstream(func(*downstreamConn) {
1370 away = false
1371 })
1372 if away == uc.away {
1373 return
1374 }
1375 if away {
1376 uc.SendMessage(&irc.Message{
1377 Command: "AWAY",
1378 Params: []string{"Auto away"},
1379 })
1380 } else {
1381 uc.SendMessage(&irc.Message{
1382 Command: "AWAY",
1383 })
1384 }
1385 uc.away = away
1386}
Note: See TracBrowser for help on using the repository browser.