source: code/trunk/upstream.go@ 274

Last change on this file since 274 was 274, checked in by contact, 5 years ago

Don't use forEachDownstreamByID when forwarding RPL_AWAY

We should broadcast the message, not send it to a specific downstream
connection.

File size: 34.5 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "gopkg.in/irc.v3"
16)
17
18type upstreamChannel struct {
19 Name string
20 conn *upstreamConn
21 Topic string
22 TopicWho string
23 TopicTime time.Time
24 Status channelStatus
25 modes channelModes
26 creationTime string
27 Members map[string]*membership
28 complete bool
29}
30
31type upstreamConn struct {
32 conn
33
34 network *network
35 user *user
36
37 serverName string
38 availableUserModes string
39 availableChannelModes map[byte]channelModeType
40 availableChannelTypes string
41 availableMemberships []membership
42
43 registered bool
44 nick string
45 username string
46 realname string
47 modes userModes
48 channels map[string]*upstreamChannel
49 caps map[string]string
50 batches map[string]batch
51 away bool
52
53 tagsSupported bool
54 labelsSupported bool
55 nextLabelID uint64
56
57 saslClient sasl.Client
58 saslStarted bool
59
60 // set of LIST commands in progress, per downstream
61 pendingLISTDownstreamSet map[uint64]struct{}
62
63 messageLoggers map[string]*messageLogger
64}
65
66func connectToUpstream(network *network) (*upstreamConn, error) {
67 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
68
69 var scheme string
70 var addr string
71
72 addrParts := strings.SplitN(network.Addr, "://", 2)
73 if len(addrParts) == 2 {
74 scheme = addrParts[0]
75 addr = addrParts[1]
76 } else {
77 scheme = "ircs"
78 addr = addrParts[0]
79 }
80
81 dialer := net.Dialer{Timeout: connectTimeout}
82
83 var netConn net.Conn
84 var err error
85 switch scheme {
86 case "ircs":
87 if !strings.ContainsRune(addr, ':') {
88 addr = addr + ":6697"
89 }
90
91 logger.Printf("connecting to TLS server at address %q", addr)
92 netConn, err = tls.DialWithDialer(&dialer, "tcp", addr, nil)
93 case "irc+insecure":
94 if !strings.ContainsRune(addr, ':') {
95 addr = addr + ":6667"
96 }
97
98 logger.Printf("connecting to plain-text server at address %q", addr)
99 netConn, err = dialer.Dial("tcp", addr)
100 default:
101 return nil, fmt.Errorf("failed to dial %q: unknown scheme: %v", addr, scheme)
102 }
103 if err != nil {
104 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
105 }
106
107 uc := &upstreamConn{
108 conn: *newConn(network.user.srv, netConn, logger),
109 network: network,
110 user: network.user,
111 channels: make(map[string]*upstreamChannel),
112 caps: make(map[string]string),
113 batches: make(map[string]batch),
114 availableChannelTypes: stdChannelTypes,
115 availableChannelModes: stdChannelModes,
116 availableMemberships: stdMemberships,
117 pendingLISTDownstreamSet: make(map[uint64]struct{}),
118 messageLoggers: make(map[string]*messageLogger),
119 }
120
121 return uc, nil
122}
123
124func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
125 uc.network.forEachDownstream(f)
126}
127
128func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
129 uc.forEachDownstream(func(dc *downstreamConn) {
130 if id != 0 && id != dc.id {
131 return
132 }
133 f(dc)
134 })
135}
136
137func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
138 ch, ok := uc.channels[name]
139 if !ok {
140 return nil, fmt.Errorf("unknown channel %q", name)
141 }
142 return ch, nil
143}
144
145func (uc *upstreamConn) isChannel(entity string) bool {
146 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
147 return true
148 }
149 return false
150}
151
152func (uc *upstreamConn) getPendingLIST() *pendingLIST {
153 for _, pl := range uc.user.pendingLISTs {
154 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
155 continue
156 }
157 return &pl
158 }
159 return nil
160}
161
162func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
163 found = false
164 for i := 0; i < len(uc.user.pendingLISTs); i++ {
165 pl := uc.user.pendingLISTs[i]
166 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
167 continue
168 }
169 delete(pl.pendingCommands, uc.network.ID)
170 if len(pl.pendingCommands) == 0 {
171 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
172 i--
173 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
174 dc.SendMessage(&irc.Message{
175 Prefix: dc.srv.prefix(),
176 Command: irc.RPL_LISTEND,
177 Params: []string{dc.nick, "End of /LIST"},
178 })
179 })
180 }
181 found = true
182 if !all {
183 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
184 uc.user.forEachUpstream(func(uc *upstreamConn) {
185 uc.trySendLIST(pl.downstreamID)
186 })
187 return
188 }
189 }
190 return
191}
192
193func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
194 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
195 // a LIST command is already pending
196 // we will try again when that command is completed
197 return
198 }
199
200 for _, pl := range uc.user.pendingLISTs {
201 if pl.downstreamID != downstreamID {
202 continue
203 }
204 // this is the first pending LIST command list of the downstream
205 listCommand, ok := pl.pendingCommands[uc.network.ID]
206 if !ok {
207 // there is no command for this upstream in these LIST commands
208 // do not send anything
209 continue
210 }
211 // there is a command for this upstream in these LIST commands
212 // send it now
213
214 uc.SendMessageLabeled(downstreamID, listCommand)
215
216 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
217 return
218 }
219}
220
221func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
222 for _, m := range uc.availableMemberships {
223 if m.Prefix == s[0] {
224 return &m, s[1:]
225 }
226 }
227 return nil, s
228}
229
230func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
231 var label string
232 if l, ok := msg.GetTag("label"); ok {
233 label = l
234 }
235
236 var msgBatch *batch
237 if batchName, ok := msg.GetTag("batch"); ok {
238 b, ok := uc.batches[batchName]
239 if !ok {
240 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
241 }
242 msgBatch = &b
243 if label == "" {
244 label = msgBatch.Label
245 }
246 }
247
248 var downstreamID uint64 = 0
249 if label != "" {
250 var labelOffset uint64
251 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
252 if err == nil && n < 2 {
253 err = errors.New("not enough arguments")
254 }
255 if err != nil {
256 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
257 }
258 }
259
260 if _, ok := msg.Tags["time"]; !ok {
261 msg.Tags["time"] = irc.TagValue(time.Now().UTC().Format(serverTimeLayout))
262 }
263
264 switch msg.Command {
265 case "PING":
266 uc.SendMessage(&irc.Message{
267 Command: "PONG",
268 Params: msg.Params,
269 })
270 return nil
271 case "NOTICE":
272 if msg.Prefix == nil {
273 return fmt.Errorf("expected a prefix")
274 }
275
276 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
277 uc.produce("", msg, nil)
278 } else { // regular user NOTICE
279 var entity, text string
280 if err := parseMessageParams(msg, &entity, &text); err != nil {
281 return err
282 }
283
284 target := entity
285 if target == uc.nick {
286 target = msg.Prefix.Name
287 }
288 uc.produce(target, msg, nil)
289 }
290 case "CAP":
291 var subCmd string
292 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
293 return err
294 }
295 subCmd = strings.ToUpper(subCmd)
296 subParams := msg.Params[2:]
297 switch subCmd {
298 case "LS":
299 if len(subParams) < 1 {
300 return newNeedMoreParamsError(msg.Command)
301 }
302 caps := strings.Fields(subParams[len(subParams)-1])
303 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
304
305 for _, s := range caps {
306 kv := strings.SplitN(s, "=", 2)
307 k := strings.ToLower(kv[0])
308 var v string
309 if len(kv) == 2 {
310 v = kv[1]
311 }
312 uc.caps[k] = v
313 }
314
315 if more {
316 break // wait to receive all capabilities
317 }
318
319 requestCaps := make([]string, 0, 16)
320 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time"} {
321 if _, ok := uc.caps[c]; ok {
322 requestCaps = append(requestCaps, c)
323 }
324 }
325
326 if uc.requestSASL() {
327 requestCaps = append(requestCaps, "sasl")
328 }
329
330 if len(requestCaps) > 0 {
331 uc.SendMessage(&irc.Message{
332 Command: "CAP",
333 Params: []string{"REQ", strings.Join(requestCaps, " ")},
334 })
335 }
336
337 if uc.requestSASL() {
338 break // we'll send CAP END after authentication is completed
339 }
340
341 uc.SendMessage(&irc.Message{
342 Command: "CAP",
343 Params: []string{"END"},
344 })
345 case "ACK", "NAK":
346 if len(subParams) < 1 {
347 return newNeedMoreParamsError(msg.Command)
348 }
349 caps := strings.Fields(subParams[0])
350
351 for _, name := range caps {
352 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
353 return err
354 }
355 }
356
357 if uc.saslClient == nil {
358 uc.SendMessage(&irc.Message{
359 Command: "CAP",
360 Params: []string{"END"},
361 })
362 }
363 default:
364 uc.logger.Printf("unhandled message: %v", msg)
365 }
366 case "AUTHENTICATE":
367 if uc.saslClient == nil {
368 return fmt.Errorf("received unexpected AUTHENTICATE message")
369 }
370
371 // TODO: if a challenge is 400 bytes long, buffer it
372 var challengeStr string
373 if err := parseMessageParams(msg, &challengeStr); err != nil {
374 uc.SendMessage(&irc.Message{
375 Command: "AUTHENTICATE",
376 Params: []string{"*"},
377 })
378 return err
379 }
380
381 var challenge []byte
382 if challengeStr != "+" {
383 var err error
384 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
385 if err != nil {
386 uc.SendMessage(&irc.Message{
387 Command: "AUTHENTICATE",
388 Params: []string{"*"},
389 })
390 return err
391 }
392 }
393
394 var resp []byte
395 var err error
396 if !uc.saslStarted {
397 _, resp, err = uc.saslClient.Start()
398 uc.saslStarted = true
399 } else {
400 resp, err = uc.saslClient.Next(challenge)
401 }
402 if err != nil {
403 uc.SendMessage(&irc.Message{
404 Command: "AUTHENTICATE",
405 Params: []string{"*"},
406 })
407 return err
408 }
409
410 // TODO: send response in multiple chunks if >= 400 bytes
411 var respStr = "+"
412 if resp != nil {
413 respStr = base64.StdEncoding.EncodeToString(resp)
414 }
415
416 uc.SendMessage(&irc.Message{
417 Command: "AUTHENTICATE",
418 Params: []string{respStr},
419 })
420 case irc.RPL_LOGGEDIN:
421 var account string
422 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
423 return err
424 }
425 uc.logger.Printf("logged in with account %q", account)
426 case irc.RPL_LOGGEDOUT:
427 uc.logger.Printf("logged out")
428 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
429 var info string
430 if err := parseMessageParams(msg, nil, &info); err != nil {
431 return err
432 }
433 switch msg.Command {
434 case irc.ERR_NICKLOCKED:
435 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
436 case irc.ERR_SASLFAIL:
437 uc.logger.Printf("SASL authentication failed: %v", info)
438 case irc.ERR_SASLTOOLONG:
439 uc.logger.Printf("SASL message too long: %v", info)
440 }
441
442 uc.saslClient = nil
443 uc.saslStarted = false
444
445 uc.SendMessage(&irc.Message{
446 Command: "CAP",
447 Params: []string{"END"},
448 })
449 case irc.RPL_WELCOME:
450 uc.registered = true
451 uc.logger.Printf("connection registered")
452
453 for _, ch := range uc.network.channels {
454 params := []string{ch.Name}
455 if ch.Key != "" {
456 params = append(params, ch.Key)
457 }
458 uc.SendMessage(&irc.Message{
459 Command: "JOIN",
460 Params: params,
461 })
462 }
463 case irc.RPL_MYINFO:
464 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
465 return err
466 }
467 case irc.RPL_ISUPPORT:
468 if err := parseMessageParams(msg, nil, nil); err != nil {
469 return err
470 }
471 for _, token := range msg.Params[1 : len(msg.Params)-1] {
472 negate := false
473 parameter := token
474 value := ""
475 if strings.HasPrefix(token, "-") {
476 negate = true
477 token = token[1:]
478 } else {
479 if i := strings.IndexByte(token, '='); i >= 0 {
480 parameter = token[:i]
481 value = token[i+1:]
482 }
483 }
484 if !negate {
485 switch parameter {
486 case "CHANMODES":
487 parts := strings.SplitN(value, ",", 5)
488 if len(parts) < 4 {
489 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
490 }
491 modes := make(map[byte]channelModeType)
492 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
493 for j := 0; j < len(parts[i]); j++ {
494 mode := parts[i][j]
495 modes[mode] = mt
496 }
497 }
498 uc.availableChannelModes = modes
499 case "CHANTYPES":
500 uc.availableChannelTypes = value
501 case "PREFIX":
502 if value == "" {
503 uc.availableMemberships = nil
504 } else {
505 if value[0] != '(' {
506 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
507 }
508 sep := strings.IndexByte(value, ')')
509 if sep < 0 || len(value) != sep*2 {
510 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
511 }
512 memberships := make([]membership, len(value)/2-1)
513 for i := range memberships {
514 memberships[i] = membership{
515 Mode: value[i+1],
516 Prefix: value[sep+i+1],
517 }
518 }
519 uc.availableMemberships = memberships
520 }
521 }
522 } else {
523 // TODO: handle ISUPPORT negations
524 }
525 }
526 case "BATCH":
527 var tag string
528 if err := parseMessageParams(msg, &tag); err != nil {
529 return err
530 }
531
532 if strings.HasPrefix(tag, "+") {
533 tag = tag[1:]
534 if _, ok := uc.batches[tag]; ok {
535 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
536 }
537 var batchType string
538 if err := parseMessageParams(msg, nil, &batchType); err != nil {
539 return err
540 }
541 label := label
542 if label == "" && msgBatch != nil {
543 label = msgBatch.Label
544 }
545 uc.batches[tag] = batch{
546 Type: batchType,
547 Params: msg.Params[2:],
548 Outer: msgBatch,
549 Label: label,
550 }
551 } else if strings.HasPrefix(tag, "-") {
552 tag = tag[1:]
553 if _, ok := uc.batches[tag]; !ok {
554 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
555 }
556 delete(uc.batches, tag)
557 } else {
558 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
559 }
560 case "NICK":
561 if msg.Prefix == nil {
562 return fmt.Errorf("expected a prefix")
563 }
564
565 var newNick string
566 if err := parseMessageParams(msg, &newNick); err != nil {
567 return err
568 }
569
570 me := false
571 if msg.Prefix.Name == uc.nick {
572 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
573 me = true
574 uc.nick = newNick
575 }
576
577 for _, ch := range uc.channels {
578 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
579 delete(ch.Members, msg.Prefix.Name)
580 ch.Members[newNick] = membership
581 uc.appendLog(ch.Name, msg)
582 uc.appendHistory(ch.Name, msg)
583 }
584 }
585
586 if !me {
587 uc.forEachDownstream(func(dc *downstreamConn) {
588 dc.SendMessage(dc.marshalMessage(msg, uc.network))
589 })
590 }
591 case "JOIN":
592 if msg.Prefix == nil {
593 return fmt.Errorf("expected a prefix")
594 }
595
596 var channels string
597 if err := parseMessageParams(msg, &channels); err != nil {
598 return err
599 }
600
601 for _, ch := range strings.Split(channels, ",") {
602 if msg.Prefix.Name == uc.nick {
603 uc.logger.Printf("joined channel %q", ch)
604 uc.channels[ch] = &upstreamChannel{
605 Name: ch,
606 conn: uc,
607 Members: make(map[string]*membership),
608 }
609
610 uc.SendMessage(&irc.Message{
611 Command: "MODE",
612 Params: []string{ch},
613 })
614 } else {
615 ch, err := uc.getChannel(ch)
616 if err != nil {
617 return err
618 }
619 ch.Members[msg.Prefix.Name] = nil
620 }
621
622 chMsg := msg.Copy()
623 chMsg.Params[0] = ch
624 uc.produce(ch, chMsg, nil)
625 }
626 case "PART":
627 if msg.Prefix == nil {
628 return fmt.Errorf("expected a prefix")
629 }
630
631 var channels string
632 if err := parseMessageParams(msg, &channels); err != nil {
633 return err
634 }
635
636 for _, ch := range strings.Split(channels, ",") {
637 if msg.Prefix.Name == uc.nick {
638 uc.logger.Printf("parted channel %q", ch)
639 delete(uc.channels, ch)
640 } else {
641 ch, err := uc.getChannel(ch)
642 if err != nil {
643 return err
644 }
645 delete(ch.Members, msg.Prefix.Name)
646 }
647
648 chMsg := msg.Copy()
649 chMsg.Params[0] = ch
650 uc.produce(ch, chMsg, nil)
651 }
652 case "KICK":
653 if msg.Prefix == nil {
654 return fmt.Errorf("expected a prefix")
655 }
656
657 var channel, user string
658 if err := parseMessageParams(msg, &channel, &user); err != nil {
659 return err
660 }
661
662 if user == uc.nick {
663 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
664 delete(uc.channels, channel)
665 } else {
666 ch, err := uc.getChannel(channel)
667 if err != nil {
668 return err
669 }
670 delete(ch.Members, user)
671 }
672
673 uc.produce(channel, msg, nil)
674 case "QUIT":
675 if msg.Prefix == nil {
676 return fmt.Errorf("expected a prefix")
677 }
678
679 if msg.Prefix.Name == uc.nick {
680 uc.logger.Printf("quit")
681 }
682
683 for _, ch := range uc.channels {
684 if _, ok := ch.Members[msg.Prefix.Name]; ok {
685 delete(ch.Members, msg.Prefix.Name)
686
687 uc.appendLog(ch.Name, msg)
688 uc.appendHistory(ch.Name, msg)
689 }
690 }
691
692 if msg.Prefix.Name != uc.nick {
693 uc.forEachDownstream(func(dc *downstreamConn) {
694 dc.SendMessage(dc.marshalMessage(msg, uc.network))
695 })
696 }
697 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
698 var name, topic string
699 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
700 return err
701 }
702 ch, err := uc.getChannel(name)
703 if err != nil {
704 return err
705 }
706 if msg.Command == irc.RPL_TOPIC {
707 ch.Topic = topic
708 } else {
709 ch.Topic = ""
710 }
711 case "TOPIC":
712 var name string
713 if err := parseMessageParams(msg, &name); err != nil {
714 return err
715 }
716 ch, err := uc.getChannel(name)
717 if err != nil {
718 return err
719 }
720 if len(msg.Params) > 1 {
721 ch.Topic = msg.Params[1]
722 } else {
723 ch.Topic = ""
724 }
725 uc.produce(ch.Name, msg, nil)
726 case "MODE":
727 var name, modeStr string
728 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
729 return err
730 }
731
732 if !uc.isChannel(name) { // user mode change
733 if name != uc.nick {
734 return fmt.Errorf("received MODE message for unknown nick %q", name)
735 }
736 return uc.modes.Apply(modeStr)
737 // TODO: notify downstreams about user mode change?
738 } else { // channel mode change
739 ch, err := uc.getChannel(name)
740 if err != nil {
741 return err
742 }
743
744 if ch.modes != nil {
745 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
746 return err
747 }
748 }
749
750 uc.produce(ch.Name, msg, nil)
751 }
752 case irc.RPL_UMODEIS:
753 if err := parseMessageParams(msg, nil); err != nil {
754 return err
755 }
756 modeStr := ""
757 if len(msg.Params) > 1 {
758 modeStr = msg.Params[1]
759 }
760
761 uc.modes = ""
762 if err := uc.modes.Apply(modeStr); err != nil {
763 return err
764 }
765 // TODO: send RPL_UMODEIS to downstream connections when applicable
766 case irc.RPL_CHANNELMODEIS:
767 var channel string
768 if err := parseMessageParams(msg, nil, &channel); err != nil {
769 return err
770 }
771 modeStr := ""
772 if len(msg.Params) > 2 {
773 modeStr = msg.Params[2]
774 }
775
776 ch, err := uc.getChannel(channel)
777 if err != nil {
778 return err
779 }
780
781 firstMode := ch.modes == nil
782 ch.modes = make(map[byte]string)
783 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
784 return err
785 }
786 if firstMode {
787 modeStr, modeParams := ch.modes.Format()
788
789 uc.forEachDownstream(func(dc *downstreamConn) {
790 params := []string{dc.nick, dc.marshalEntity(uc.network, channel), modeStr}
791 params = append(params, modeParams...)
792
793 dc.SendMessage(&irc.Message{
794 Prefix: dc.srv.prefix(),
795 Command: irc.RPL_CHANNELMODEIS,
796 Params: params,
797 })
798 })
799 }
800 case rpl_creationtime:
801 var channel, creationTime string
802 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
803 return err
804 }
805
806 ch, err := uc.getChannel(channel)
807 if err != nil {
808 return err
809 }
810
811 firstCreationTime := ch.creationTime == ""
812 ch.creationTime = creationTime
813 if firstCreationTime {
814 uc.forEachDownstream(func(dc *downstreamConn) {
815 dc.SendMessage(&irc.Message{
816 Prefix: dc.srv.prefix(),
817 Command: rpl_creationtime,
818 Params: []string{dc.nick, channel, creationTime},
819 })
820 })
821 }
822 case rpl_topicwhotime:
823 var name, who, timeStr string
824 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
825 return err
826 }
827 ch, err := uc.getChannel(name)
828 if err != nil {
829 return err
830 }
831 ch.TopicWho = who
832 sec, err := strconv.ParseInt(timeStr, 10, 64)
833 if err != nil {
834 return fmt.Errorf("failed to parse topic time: %v", err)
835 }
836 ch.TopicTime = time.Unix(sec, 0)
837 case irc.RPL_LIST:
838 var channel, clients, topic string
839 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
840 return err
841 }
842
843 pl := uc.getPendingLIST()
844 if pl == nil {
845 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
846 }
847
848 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
849 dc.SendMessage(&irc.Message{
850 Prefix: dc.srv.prefix(),
851 Command: irc.RPL_LIST,
852 Params: []string{dc.nick, dc.marshalEntity(uc.network, channel), clients, topic},
853 })
854 })
855 case irc.RPL_LISTEND:
856 ok := uc.endPendingLISTs(false)
857 if !ok {
858 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
859 }
860 case irc.RPL_NAMREPLY:
861 var name, statusStr, members string
862 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
863 return err
864 }
865
866 ch, ok := uc.channels[name]
867 if !ok {
868 // NAMES on a channel we have not joined, forward to downstream
869 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
870 channel := dc.marshalEntity(uc.network, name)
871 members := splitSpace(members)
872 for i, member := range members {
873 membership, nick := uc.parseMembershipPrefix(member)
874 members[i] = membership.String() + dc.marshalEntity(uc.network, nick)
875 }
876 memberStr := strings.Join(members, " ")
877
878 dc.SendMessage(&irc.Message{
879 Prefix: dc.srv.prefix(),
880 Command: irc.RPL_NAMREPLY,
881 Params: []string{dc.nick, statusStr, channel, memberStr},
882 })
883 })
884 return nil
885 }
886
887 status, err := parseChannelStatus(statusStr)
888 if err != nil {
889 return err
890 }
891 ch.Status = status
892
893 for _, s := range splitSpace(members) {
894 membership, nick := uc.parseMembershipPrefix(s)
895 ch.Members[nick] = membership
896 }
897 case irc.RPL_ENDOFNAMES:
898 var name string
899 if err := parseMessageParams(msg, nil, &name); err != nil {
900 return err
901 }
902
903 ch, ok := uc.channels[name]
904 if !ok {
905 // NAMES on a channel we have not joined, forward to downstream
906 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
907 channel := dc.marshalEntity(uc.network, name)
908
909 dc.SendMessage(&irc.Message{
910 Prefix: dc.srv.prefix(),
911 Command: irc.RPL_ENDOFNAMES,
912 Params: []string{dc.nick, channel, "End of /NAMES list"},
913 })
914 })
915 return nil
916 }
917
918 if ch.complete {
919 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
920 }
921 ch.complete = true
922
923 uc.forEachDownstream(func(dc *downstreamConn) {
924 forwardChannel(dc, ch)
925 })
926 case irc.RPL_WHOREPLY:
927 var channel, username, host, server, nick, mode, trailing string
928 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
929 return err
930 }
931
932 parts := strings.SplitN(trailing, " ", 2)
933 if len(parts) != 2 {
934 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
935 }
936 realname := parts[1]
937 hops, err := strconv.Atoi(parts[0])
938 if err != nil {
939 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
940 }
941 hops++
942
943 trailing = strconv.Itoa(hops) + " " + realname
944
945 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
946 channel := channel
947 if channel != "*" {
948 channel = dc.marshalEntity(uc.network, channel)
949 }
950 nick := dc.marshalEntity(uc.network, nick)
951 dc.SendMessage(&irc.Message{
952 Prefix: dc.srv.prefix(),
953 Command: irc.RPL_WHOREPLY,
954 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
955 })
956 })
957 case irc.RPL_ENDOFWHO:
958 var name string
959 if err := parseMessageParams(msg, nil, &name); err != nil {
960 return err
961 }
962
963 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
964 name := name
965 if name != "*" {
966 // TODO: support WHO masks
967 name = dc.marshalEntity(uc.network, name)
968 }
969 dc.SendMessage(&irc.Message{
970 Prefix: dc.srv.prefix(),
971 Command: irc.RPL_ENDOFWHO,
972 Params: []string{dc.nick, name, "End of /WHO list"},
973 })
974 })
975 case irc.RPL_WHOISUSER:
976 var nick, username, host, realname string
977 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
978 return err
979 }
980
981 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
982 nick := dc.marshalEntity(uc.network, nick)
983 dc.SendMessage(&irc.Message{
984 Prefix: dc.srv.prefix(),
985 Command: irc.RPL_WHOISUSER,
986 Params: []string{dc.nick, nick, username, host, "*", realname},
987 })
988 })
989 case irc.RPL_WHOISSERVER:
990 var nick, server, serverInfo string
991 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
992 return err
993 }
994
995 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
996 nick := dc.marshalEntity(uc.network, nick)
997 dc.SendMessage(&irc.Message{
998 Prefix: dc.srv.prefix(),
999 Command: irc.RPL_WHOISSERVER,
1000 Params: []string{dc.nick, nick, server, serverInfo},
1001 })
1002 })
1003 case irc.RPL_WHOISOPERATOR:
1004 var nick string
1005 if err := parseMessageParams(msg, nil, &nick); err != nil {
1006 return err
1007 }
1008
1009 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1010 nick := dc.marshalEntity(uc.network, nick)
1011 dc.SendMessage(&irc.Message{
1012 Prefix: dc.srv.prefix(),
1013 Command: irc.RPL_WHOISOPERATOR,
1014 Params: []string{dc.nick, nick, "is an IRC operator"},
1015 })
1016 })
1017 case irc.RPL_WHOISIDLE:
1018 var nick string
1019 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1020 return err
1021 }
1022
1023 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1024 nick := dc.marshalEntity(uc.network, nick)
1025 params := []string{dc.nick, nick}
1026 params = append(params, msg.Params[2:]...)
1027 dc.SendMessage(&irc.Message{
1028 Prefix: dc.srv.prefix(),
1029 Command: irc.RPL_WHOISIDLE,
1030 Params: params,
1031 })
1032 })
1033 case irc.RPL_WHOISCHANNELS:
1034 var nick, channelList string
1035 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1036 return err
1037 }
1038 channels := splitSpace(channelList)
1039
1040 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1041 nick := dc.marshalEntity(uc.network, nick)
1042 channelList := make([]string, len(channels))
1043 for i, channel := range channels {
1044 prefix, channel := uc.parseMembershipPrefix(channel)
1045 channel = dc.marshalEntity(uc.network, channel)
1046 channelList[i] = prefix.String() + channel
1047 }
1048 channels := strings.Join(channelList, " ")
1049 dc.SendMessage(&irc.Message{
1050 Prefix: dc.srv.prefix(),
1051 Command: irc.RPL_WHOISCHANNELS,
1052 Params: []string{dc.nick, nick, channels},
1053 })
1054 })
1055 case irc.RPL_ENDOFWHOIS:
1056 var nick string
1057 if err := parseMessageParams(msg, nil, &nick); err != nil {
1058 return err
1059 }
1060
1061 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1062 nick := dc.marshalEntity(uc.network, nick)
1063 dc.SendMessage(&irc.Message{
1064 Prefix: dc.srv.prefix(),
1065 Command: irc.RPL_ENDOFWHOIS,
1066 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1067 })
1068 })
1069 case "PRIVMSG":
1070 if msg.Prefix == nil {
1071 return fmt.Errorf("expected a prefix")
1072 }
1073
1074 var entity, text string
1075 if err := parseMessageParams(msg, &entity, &text); err != nil {
1076 return err
1077 }
1078
1079 if msg.Prefix.Name == serviceNick {
1080 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1081 break
1082 }
1083 if entity == serviceNick {
1084 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1085 break
1086 }
1087
1088 target := entity
1089 if target == uc.nick {
1090 target = msg.Prefix.Name
1091 }
1092 uc.produce(target, msg, nil)
1093 case "INVITE":
1094 var nick, channel string
1095 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1096 return err
1097 }
1098
1099 uc.forEachDownstream(func(dc *downstreamConn) {
1100 dc.SendMessage(&irc.Message{
1101 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1102 Command: "INVITE",
1103 Params: []string{dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1104 })
1105 })
1106 case irc.RPL_INVITING:
1107 var nick, channel string
1108 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1109 return err
1110 }
1111
1112 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1113 dc.SendMessage(&irc.Message{
1114 Prefix: dc.srv.prefix(),
1115 Command: irc.RPL_INVITING,
1116 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1117 })
1118 })
1119 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1120 var command, reason string
1121 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1122 return err
1123 }
1124
1125 if command == "LIST" {
1126 ok := uc.endPendingLISTs(false)
1127 if !ok {
1128 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1129 }
1130 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1131 dc.SendMessage(&irc.Message{
1132 Prefix: uc.srv.prefix(),
1133 Command: msg.Command,
1134 Params: []string{dc.nick, "LIST", reason},
1135 })
1136 })
1137 }
1138 case irc.RPL_AWAY:
1139 var nick, reason string
1140 if err := parseMessageParams(msg, nil, &nick, &reason); err != nil {
1141 return err
1142 }
1143
1144 uc.forEachDownstream(func(dc *downstreamConn) {
1145 dc.SendMessage(&irc.Message{
1146 Prefix: dc.srv.prefix(),
1147 Command: irc.RPL_AWAY,
1148 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), reason},
1149 })
1150 })
1151 case "TAGMSG":
1152 // TODO: relay to downstream connections that accept message-tags
1153 case "ACK":
1154 // Ignore
1155 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1156 // Ignore
1157 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1158 // Ignore
1159 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1160 // Ignore
1161 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1162 // Ignore
1163 case irc.RPL_LISTSTART:
1164 // Ignore
1165 case rpl_localusers, rpl_globalusers:
1166 // Ignore
1167 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1168 // Ignore
1169 default:
1170 uc.logger.Printf("unhandled message: %v", msg)
1171 }
1172 return nil
1173}
1174
1175func splitSpace(s string) []string {
1176 return strings.FieldsFunc(s, func(r rune) bool {
1177 return r == ' '
1178 })
1179}
1180
1181func (uc *upstreamConn) register() {
1182 uc.nick = uc.network.Nick
1183 uc.username = uc.network.Username
1184 if uc.username == "" {
1185 uc.username = uc.nick
1186 }
1187 uc.realname = uc.network.Realname
1188 if uc.realname == "" {
1189 uc.realname = uc.nick
1190 }
1191
1192 uc.SendMessage(&irc.Message{
1193 Command: "CAP",
1194 Params: []string{"LS", "302"},
1195 })
1196
1197 if uc.network.Pass != "" {
1198 uc.SendMessage(&irc.Message{
1199 Command: "PASS",
1200 Params: []string{uc.network.Pass},
1201 })
1202 }
1203
1204 uc.SendMessage(&irc.Message{
1205 Command: "NICK",
1206 Params: []string{uc.nick},
1207 })
1208 uc.SendMessage(&irc.Message{
1209 Command: "USER",
1210 Params: []string{uc.username, "0", "*", uc.realname},
1211 })
1212}
1213
1214func (uc *upstreamConn) runUntilRegistered() error {
1215 for !uc.registered {
1216 msg, err := uc.ReadMessage()
1217 if err != nil {
1218 return fmt.Errorf("failed to read message: %v", err)
1219 }
1220
1221 if err := uc.handleMessage(msg); err != nil {
1222 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1223 }
1224 }
1225
1226 for _, command := range uc.network.ConnectCommands {
1227 m, err := irc.ParseMessage(command)
1228 if err != nil {
1229 uc.logger.Printf("failed to parse connect command %q: %v", command, err)
1230 } else {
1231 uc.SendMessage(m)
1232 }
1233 }
1234
1235 return nil
1236}
1237
1238func (uc *upstreamConn) requestSASL() bool {
1239 if uc.network.SASL.Mechanism == "" {
1240 return false
1241 }
1242
1243 v, ok := uc.caps["sasl"]
1244 if !ok {
1245 return false
1246 }
1247 if v != "" {
1248 mechanisms := strings.Split(v, ",")
1249 found := false
1250 for _, mech := range mechanisms {
1251 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1252 found = true
1253 break
1254 }
1255 }
1256 if !found {
1257 return false
1258 }
1259 }
1260
1261 return true
1262}
1263
1264func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1265 auth := &uc.network.SASL
1266 switch name {
1267 case "sasl":
1268 if !ok {
1269 uc.logger.Printf("server refused to acknowledge the SASL capability")
1270 return nil
1271 }
1272
1273 switch auth.Mechanism {
1274 case "PLAIN":
1275 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1276 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1277 default:
1278 return fmt.Errorf("unsupported SASL mechanism %q", name)
1279 }
1280
1281 uc.SendMessage(&irc.Message{
1282 Command: "AUTHENTICATE",
1283 Params: []string{auth.Mechanism},
1284 })
1285 case "message-tags":
1286 uc.tagsSupported = ok
1287 case "labeled-response":
1288 uc.labelsSupported = ok
1289 case "batch", "server-time":
1290 // Nothing to do
1291 default:
1292 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
1293 }
1294 return nil
1295}
1296
1297func (uc *upstreamConn) readMessages(ch chan<- event) error {
1298 for {
1299 msg, err := uc.ReadMessage()
1300 if err == io.EOF {
1301 break
1302 } else if err != nil {
1303 return fmt.Errorf("failed to read IRC command: %v", err)
1304 }
1305
1306 ch <- eventUpstreamMessage{msg, uc}
1307 }
1308
1309 return nil
1310}
1311
1312func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1313 if uc.labelsSupported {
1314 if msg.Tags == nil {
1315 msg.Tags = make(map[string]irc.TagValue)
1316 }
1317 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1318 uc.nextLabelID++
1319 }
1320 uc.SendMessage(msg)
1321}
1322
1323// TODO: handle moving logs when a network name changes, when support for this is added
1324func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
1325 if uc.srv.LogPath == "" {
1326 return
1327 }
1328
1329 ml, ok := uc.messageLoggers[entity]
1330 if !ok {
1331 ml = newMessageLogger(uc.network, entity)
1332 uc.messageLoggers[entity] = ml
1333 }
1334
1335 if err := ml.Append(msg); err != nil {
1336 uc.logger.Printf("failed to log message: %v", err)
1337 }
1338}
1339
1340// appendHistory appends a message to the history. entity can be empty.
1341func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
1342 // If no client is offline, no need to append the message to the buffer
1343 if len(uc.network.offlineClients) == 0 {
1344 return
1345 }
1346
1347 history, ok := uc.network.history[entity]
1348 if !ok {
1349 history = &networkHistory{
1350 offlineClients: make(map[string]uint64),
1351 ring: NewRing(uc.srv.RingCap),
1352 }
1353 uc.network.history[entity] = history
1354
1355 for clientName, _ := range uc.network.offlineClients {
1356 history.offlineClients[clientName] = 0
1357 }
1358 }
1359
1360 history.ring.Produce(msg)
1361}
1362
1363// produce appends a message to the logs, adds it to the history and forwards
1364// it to connected downstream connections.
1365//
1366// If origin is not nil and origin doesn't support echo-message, the message is
1367// forwarded to all connections except origin.
1368func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) {
1369 if target != "" {
1370 uc.appendLog(target, msg)
1371 }
1372
1373 uc.appendHistory(target, msg)
1374
1375 uc.forEachDownstream(func(dc *downstreamConn) {
1376 if dc != origin || dc.caps["echo-message"] {
1377 dc.SendMessage(dc.marshalMessage(msg, uc.network))
1378 }
1379 })
1380}
1381
1382func (uc *upstreamConn) updateAway() {
1383 away := true
1384 uc.forEachDownstream(func(*downstreamConn) {
1385 away = false
1386 })
1387 if away == uc.away {
1388 return
1389 }
1390 if away {
1391 uc.SendMessage(&irc.Message{
1392 Command: "AWAY",
1393 Params: []string{"Auto away"},
1394 })
1395 } else {
1396 uc.SendMessage(&irc.Message{
1397 Command: "AWAY",
1398 })
1399 }
1400 uc.away = away
1401}
Note: See TracBrowser for help on using the repository browser.