source: code/trunk/upstream.go@ 277

Last change on this file since 277 was 277, checked in by contact, 5 years ago

Rename upstreamConn.caps to supportedCaps

For consistency with downstreamConn.

File size: 35.1 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "gopkg.in/irc.v3"
16)
17
18type upstreamChannel struct {
19 Name string
20 conn *upstreamConn
21 Topic string
22 TopicWho string
23 TopicTime time.Time
24 Status channelStatus
25 modes channelModes
26 creationTime string
27 Members map[string]*membership
28 complete bool
29}
30
31type upstreamConn struct {
32 conn
33
34 network *network
35 user *user
36
37 serverName string
38 availableUserModes string
39 availableChannelModes map[byte]channelModeType
40 availableChannelTypes string
41 availableMemberships []membership
42
43 registered bool
44 nick string
45 username string
46 realname string
47 modes userModes
48 channels map[string]*upstreamChannel
49 supportedCaps map[string]string
50 batches map[string]batch
51 away bool
52
53 tagsSupported bool
54 awayNotifySupported bool
55 labelsSupported bool
56 nextLabelID uint64
57
58 saslClient sasl.Client
59 saslStarted bool
60
61 // set of LIST commands in progress, per downstream
62 pendingLISTDownstreamSet map[uint64]struct{}
63
64 messageLoggers map[string]*messageLogger
65}
66
67func connectToUpstream(network *network) (*upstreamConn, error) {
68 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
69
70 var scheme string
71 var addr string
72
73 addrParts := strings.SplitN(network.Addr, "://", 2)
74 if len(addrParts) == 2 {
75 scheme = addrParts[0]
76 addr = addrParts[1]
77 } else {
78 scheme = "ircs"
79 addr = addrParts[0]
80 }
81
82 dialer := net.Dialer{Timeout: connectTimeout}
83
84 var netConn net.Conn
85 var err error
86 switch scheme {
87 case "ircs":
88 if !strings.ContainsRune(addr, ':') {
89 addr = addr + ":6697"
90 }
91
92 logger.Printf("connecting to TLS server at address %q", addr)
93 netConn, err = tls.DialWithDialer(&dialer, "tcp", addr, nil)
94 case "irc+insecure":
95 if !strings.ContainsRune(addr, ':') {
96 addr = addr + ":6667"
97 }
98
99 logger.Printf("connecting to plain-text server at address %q", addr)
100 netConn, err = dialer.Dial("tcp", addr)
101 default:
102 return nil, fmt.Errorf("failed to dial %q: unknown scheme: %v", addr, scheme)
103 }
104 if err != nil {
105 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
106 }
107
108 uc := &upstreamConn{
109 conn: *newConn(network.user.srv, netConn, logger),
110 network: network,
111 user: network.user,
112 channels: make(map[string]*upstreamChannel),
113 supportedCaps: make(map[string]string),
114 batches: make(map[string]batch),
115 availableChannelTypes: stdChannelTypes,
116 availableChannelModes: stdChannelModes,
117 availableMemberships: stdMemberships,
118 pendingLISTDownstreamSet: make(map[uint64]struct{}),
119 messageLoggers: make(map[string]*messageLogger),
120 }
121
122 return uc, nil
123}
124
125func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
126 uc.network.forEachDownstream(f)
127}
128
129func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
130 uc.forEachDownstream(func(dc *downstreamConn) {
131 if id != 0 && id != dc.id {
132 return
133 }
134 f(dc)
135 })
136}
137
138func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
139 ch, ok := uc.channels[name]
140 if !ok {
141 return nil, fmt.Errorf("unknown channel %q", name)
142 }
143 return ch, nil
144}
145
146func (uc *upstreamConn) isChannel(entity string) bool {
147 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
148 return true
149 }
150 return false
151}
152
153func (uc *upstreamConn) getPendingLIST() *pendingLIST {
154 for _, pl := range uc.user.pendingLISTs {
155 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
156 continue
157 }
158 return &pl
159 }
160 return nil
161}
162
163func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
164 found = false
165 for i := 0; i < len(uc.user.pendingLISTs); i++ {
166 pl := uc.user.pendingLISTs[i]
167 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
168 continue
169 }
170 delete(pl.pendingCommands, uc.network.ID)
171 if len(pl.pendingCommands) == 0 {
172 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
173 i--
174 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
175 dc.SendMessage(&irc.Message{
176 Prefix: dc.srv.prefix(),
177 Command: irc.RPL_LISTEND,
178 Params: []string{dc.nick, "End of /LIST"},
179 })
180 })
181 }
182 found = true
183 if !all {
184 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
185 uc.user.forEachUpstream(func(uc *upstreamConn) {
186 uc.trySendLIST(pl.downstreamID)
187 })
188 return
189 }
190 }
191 return
192}
193
194func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
195 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
196 // a LIST command is already pending
197 // we will try again when that command is completed
198 return
199 }
200
201 for _, pl := range uc.user.pendingLISTs {
202 if pl.downstreamID != downstreamID {
203 continue
204 }
205 // this is the first pending LIST command list of the downstream
206 listCommand, ok := pl.pendingCommands[uc.network.ID]
207 if !ok {
208 // there is no command for this upstream in these LIST commands
209 // do not send anything
210 continue
211 }
212 // there is a command for this upstream in these LIST commands
213 // send it now
214
215 uc.SendMessageLabeled(downstreamID, listCommand)
216
217 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
218 return
219 }
220}
221
222func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
223 for _, m := range uc.availableMemberships {
224 if m.Prefix == s[0] {
225 return &m, s[1:]
226 }
227 }
228 return nil, s
229}
230
231func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
232 var label string
233 if l, ok := msg.GetTag("label"); ok {
234 label = l
235 }
236
237 var msgBatch *batch
238 if batchName, ok := msg.GetTag("batch"); ok {
239 b, ok := uc.batches[batchName]
240 if !ok {
241 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
242 }
243 msgBatch = &b
244 if label == "" {
245 label = msgBatch.Label
246 }
247 }
248
249 var downstreamID uint64 = 0
250 if label != "" {
251 var labelOffset uint64
252 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
253 if err == nil && n < 2 {
254 err = errors.New("not enough arguments")
255 }
256 if err != nil {
257 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
258 }
259 }
260
261 if _, ok := msg.Tags["time"]; !ok {
262 msg.Tags["time"] = irc.TagValue(time.Now().UTC().Format(serverTimeLayout))
263 }
264
265 switch msg.Command {
266 case "PING":
267 uc.SendMessage(&irc.Message{
268 Command: "PONG",
269 Params: msg.Params,
270 })
271 return nil
272 case "NOTICE":
273 if msg.Prefix == nil {
274 return fmt.Errorf("expected a prefix")
275 }
276
277 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
278 uc.produce("", msg, nil)
279 } else { // regular user NOTICE
280 var entity, text string
281 if err := parseMessageParams(msg, &entity, &text); err != nil {
282 return err
283 }
284
285 target := entity
286 if target == uc.nick {
287 target = msg.Prefix.Name
288 }
289 uc.produce(target, msg, nil)
290 }
291 case "CAP":
292 var subCmd string
293 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
294 return err
295 }
296 subCmd = strings.ToUpper(subCmd)
297 subParams := msg.Params[2:]
298 switch subCmd {
299 case "LS":
300 if len(subParams) < 1 {
301 return newNeedMoreParamsError(msg.Command)
302 }
303 caps := strings.Fields(subParams[len(subParams)-1])
304 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
305
306 for _, s := range caps {
307 kv := strings.SplitN(s, "=", 2)
308 k := strings.ToLower(kv[0])
309 var v string
310 if len(kv) == 2 {
311 v = kv[1]
312 }
313 uc.supportedCaps[k] = v
314 }
315
316 if more {
317 break // wait to receive all capabilities
318 }
319
320 requestCaps := make([]string, 0, 16)
321 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time", "away-notify"} {
322 if _, ok := uc.supportedCaps[c]; ok {
323 requestCaps = append(requestCaps, c)
324 }
325 }
326
327 if uc.requestSASL() {
328 requestCaps = append(requestCaps, "sasl")
329 }
330
331 if len(requestCaps) > 0 {
332 uc.SendMessage(&irc.Message{
333 Command: "CAP",
334 Params: []string{"REQ", strings.Join(requestCaps, " ")},
335 })
336 }
337
338 if uc.requestSASL() {
339 break // we'll send CAP END after authentication is completed
340 }
341
342 uc.SendMessage(&irc.Message{
343 Command: "CAP",
344 Params: []string{"END"},
345 })
346 case "ACK", "NAK":
347 if len(subParams) < 1 {
348 return newNeedMoreParamsError(msg.Command)
349 }
350 caps := strings.Fields(subParams[0])
351
352 for _, name := range caps {
353 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
354 return err
355 }
356 }
357
358 if uc.saslClient == nil {
359 uc.SendMessage(&irc.Message{
360 Command: "CAP",
361 Params: []string{"END"},
362 })
363 }
364 default:
365 uc.logger.Printf("unhandled message: %v", msg)
366 }
367 case "AUTHENTICATE":
368 if uc.saslClient == nil {
369 return fmt.Errorf("received unexpected AUTHENTICATE message")
370 }
371
372 // TODO: if a challenge is 400 bytes long, buffer it
373 var challengeStr string
374 if err := parseMessageParams(msg, &challengeStr); err != nil {
375 uc.SendMessage(&irc.Message{
376 Command: "AUTHENTICATE",
377 Params: []string{"*"},
378 })
379 return err
380 }
381
382 var challenge []byte
383 if challengeStr != "+" {
384 var err error
385 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
386 if err != nil {
387 uc.SendMessage(&irc.Message{
388 Command: "AUTHENTICATE",
389 Params: []string{"*"},
390 })
391 return err
392 }
393 }
394
395 var resp []byte
396 var err error
397 if !uc.saslStarted {
398 _, resp, err = uc.saslClient.Start()
399 uc.saslStarted = true
400 } else {
401 resp, err = uc.saslClient.Next(challenge)
402 }
403 if err != nil {
404 uc.SendMessage(&irc.Message{
405 Command: "AUTHENTICATE",
406 Params: []string{"*"},
407 })
408 return err
409 }
410
411 // TODO: send response in multiple chunks if >= 400 bytes
412 var respStr = "+"
413 if resp != nil {
414 respStr = base64.StdEncoding.EncodeToString(resp)
415 }
416
417 uc.SendMessage(&irc.Message{
418 Command: "AUTHENTICATE",
419 Params: []string{respStr},
420 })
421 case irc.RPL_LOGGEDIN:
422 var account string
423 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
424 return err
425 }
426 uc.logger.Printf("logged in with account %q", account)
427 case irc.RPL_LOGGEDOUT:
428 uc.logger.Printf("logged out")
429 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
430 var info string
431 if err := parseMessageParams(msg, nil, &info); err != nil {
432 return err
433 }
434 switch msg.Command {
435 case irc.ERR_NICKLOCKED:
436 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
437 case irc.ERR_SASLFAIL:
438 uc.logger.Printf("SASL authentication failed: %v", info)
439 case irc.ERR_SASLTOOLONG:
440 uc.logger.Printf("SASL message too long: %v", info)
441 }
442
443 uc.saslClient = nil
444 uc.saslStarted = false
445
446 uc.SendMessage(&irc.Message{
447 Command: "CAP",
448 Params: []string{"END"},
449 })
450 case irc.RPL_WELCOME:
451 uc.registered = true
452 uc.logger.Printf("connection registered")
453
454 uc.forEachDownstream(func(dc *downstreamConn) {
455 dc.updateSupportedCaps()
456 })
457
458 for _, ch := range uc.network.channels {
459 params := []string{ch.Name}
460 if ch.Key != "" {
461 params = append(params, ch.Key)
462 }
463 uc.SendMessage(&irc.Message{
464 Command: "JOIN",
465 Params: params,
466 })
467 }
468 case irc.RPL_MYINFO:
469 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
470 return err
471 }
472 case irc.RPL_ISUPPORT:
473 if err := parseMessageParams(msg, nil, nil); err != nil {
474 return err
475 }
476 for _, token := range msg.Params[1 : len(msg.Params)-1] {
477 negate := false
478 parameter := token
479 value := ""
480 if strings.HasPrefix(token, "-") {
481 negate = true
482 token = token[1:]
483 } else {
484 if i := strings.IndexByte(token, '='); i >= 0 {
485 parameter = token[:i]
486 value = token[i+1:]
487 }
488 }
489 if !negate {
490 switch parameter {
491 case "CHANMODES":
492 parts := strings.SplitN(value, ",", 5)
493 if len(parts) < 4 {
494 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
495 }
496 modes := make(map[byte]channelModeType)
497 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
498 for j := 0; j < len(parts[i]); j++ {
499 mode := parts[i][j]
500 modes[mode] = mt
501 }
502 }
503 uc.availableChannelModes = modes
504 case "CHANTYPES":
505 uc.availableChannelTypes = value
506 case "PREFIX":
507 if value == "" {
508 uc.availableMemberships = nil
509 } else {
510 if value[0] != '(' {
511 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
512 }
513 sep := strings.IndexByte(value, ')')
514 if sep < 0 || len(value) != sep*2 {
515 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
516 }
517 memberships := make([]membership, len(value)/2-1)
518 for i := range memberships {
519 memberships[i] = membership{
520 Mode: value[i+1],
521 Prefix: value[sep+i+1],
522 }
523 }
524 uc.availableMemberships = memberships
525 }
526 }
527 } else {
528 // TODO: handle ISUPPORT negations
529 }
530 }
531 case "BATCH":
532 var tag string
533 if err := parseMessageParams(msg, &tag); err != nil {
534 return err
535 }
536
537 if strings.HasPrefix(tag, "+") {
538 tag = tag[1:]
539 if _, ok := uc.batches[tag]; ok {
540 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
541 }
542 var batchType string
543 if err := parseMessageParams(msg, nil, &batchType); err != nil {
544 return err
545 }
546 label := label
547 if label == "" && msgBatch != nil {
548 label = msgBatch.Label
549 }
550 uc.batches[tag] = batch{
551 Type: batchType,
552 Params: msg.Params[2:],
553 Outer: msgBatch,
554 Label: label,
555 }
556 } else if strings.HasPrefix(tag, "-") {
557 tag = tag[1:]
558 if _, ok := uc.batches[tag]; !ok {
559 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
560 }
561 delete(uc.batches, tag)
562 } else {
563 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
564 }
565 case "NICK":
566 if msg.Prefix == nil {
567 return fmt.Errorf("expected a prefix")
568 }
569
570 var newNick string
571 if err := parseMessageParams(msg, &newNick); err != nil {
572 return err
573 }
574
575 me := false
576 if msg.Prefix.Name == uc.nick {
577 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
578 me = true
579 uc.nick = newNick
580 }
581
582 for _, ch := range uc.channels {
583 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
584 delete(ch.Members, msg.Prefix.Name)
585 ch.Members[newNick] = membership
586 uc.appendLog(ch.Name, msg)
587 uc.appendHistory(ch.Name, msg)
588 }
589 }
590
591 if !me {
592 uc.forEachDownstream(func(dc *downstreamConn) {
593 dc.SendMessage(dc.marshalMessage(msg, uc.network))
594 })
595 }
596 case "JOIN":
597 if msg.Prefix == nil {
598 return fmt.Errorf("expected a prefix")
599 }
600
601 var channels string
602 if err := parseMessageParams(msg, &channels); err != nil {
603 return err
604 }
605
606 for _, ch := range strings.Split(channels, ",") {
607 if msg.Prefix.Name == uc.nick {
608 uc.logger.Printf("joined channel %q", ch)
609 uc.channels[ch] = &upstreamChannel{
610 Name: ch,
611 conn: uc,
612 Members: make(map[string]*membership),
613 }
614
615 uc.SendMessage(&irc.Message{
616 Command: "MODE",
617 Params: []string{ch},
618 })
619 } else {
620 ch, err := uc.getChannel(ch)
621 if err != nil {
622 return err
623 }
624 ch.Members[msg.Prefix.Name] = nil
625 }
626
627 chMsg := msg.Copy()
628 chMsg.Params[0] = ch
629 uc.produce(ch, chMsg, nil)
630 }
631 case "PART":
632 if msg.Prefix == nil {
633 return fmt.Errorf("expected a prefix")
634 }
635
636 var channels string
637 if err := parseMessageParams(msg, &channels); err != nil {
638 return err
639 }
640
641 for _, ch := range strings.Split(channels, ",") {
642 if msg.Prefix.Name == uc.nick {
643 uc.logger.Printf("parted channel %q", ch)
644 delete(uc.channels, ch)
645 } else {
646 ch, err := uc.getChannel(ch)
647 if err != nil {
648 return err
649 }
650 delete(ch.Members, msg.Prefix.Name)
651 }
652
653 chMsg := msg.Copy()
654 chMsg.Params[0] = ch
655 uc.produce(ch, chMsg, nil)
656 }
657 case "KICK":
658 if msg.Prefix == nil {
659 return fmt.Errorf("expected a prefix")
660 }
661
662 var channel, user string
663 if err := parseMessageParams(msg, &channel, &user); err != nil {
664 return err
665 }
666
667 if user == uc.nick {
668 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
669 delete(uc.channels, channel)
670 } else {
671 ch, err := uc.getChannel(channel)
672 if err != nil {
673 return err
674 }
675 delete(ch.Members, user)
676 }
677
678 uc.produce(channel, msg, nil)
679 case "QUIT":
680 if msg.Prefix == nil {
681 return fmt.Errorf("expected a prefix")
682 }
683
684 if msg.Prefix.Name == uc.nick {
685 uc.logger.Printf("quit")
686 }
687
688 for _, ch := range uc.channels {
689 if _, ok := ch.Members[msg.Prefix.Name]; ok {
690 delete(ch.Members, msg.Prefix.Name)
691
692 uc.appendLog(ch.Name, msg)
693 uc.appendHistory(ch.Name, msg)
694 }
695 }
696
697 if msg.Prefix.Name != uc.nick {
698 uc.forEachDownstream(func(dc *downstreamConn) {
699 dc.SendMessage(dc.marshalMessage(msg, uc.network))
700 })
701 }
702 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
703 var name, topic string
704 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
705 return err
706 }
707 ch, err := uc.getChannel(name)
708 if err != nil {
709 return err
710 }
711 if msg.Command == irc.RPL_TOPIC {
712 ch.Topic = topic
713 } else {
714 ch.Topic = ""
715 }
716 case "TOPIC":
717 var name string
718 if err := parseMessageParams(msg, &name); err != nil {
719 return err
720 }
721 ch, err := uc.getChannel(name)
722 if err != nil {
723 return err
724 }
725 if len(msg.Params) > 1 {
726 ch.Topic = msg.Params[1]
727 } else {
728 ch.Topic = ""
729 }
730 uc.produce(ch.Name, msg, nil)
731 case "MODE":
732 var name, modeStr string
733 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
734 return err
735 }
736
737 if !uc.isChannel(name) { // user mode change
738 if name != uc.nick {
739 return fmt.Errorf("received MODE message for unknown nick %q", name)
740 }
741 return uc.modes.Apply(modeStr)
742 // TODO: notify downstreams about user mode change?
743 } else { // channel mode change
744 ch, err := uc.getChannel(name)
745 if err != nil {
746 return err
747 }
748
749 if ch.modes != nil {
750 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
751 return err
752 }
753 }
754
755 uc.produce(ch.Name, msg, nil)
756 }
757 case irc.RPL_UMODEIS:
758 if err := parseMessageParams(msg, nil); err != nil {
759 return err
760 }
761 modeStr := ""
762 if len(msg.Params) > 1 {
763 modeStr = msg.Params[1]
764 }
765
766 uc.modes = ""
767 if err := uc.modes.Apply(modeStr); err != nil {
768 return err
769 }
770 // TODO: send RPL_UMODEIS to downstream connections when applicable
771 case irc.RPL_CHANNELMODEIS:
772 var channel string
773 if err := parseMessageParams(msg, nil, &channel); err != nil {
774 return err
775 }
776 modeStr := ""
777 if len(msg.Params) > 2 {
778 modeStr = msg.Params[2]
779 }
780
781 ch, err := uc.getChannel(channel)
782 if err != nil {
783 return err
784 }
785
786 firstMode := ch.modes == nil
787 ch.modes = make(map[byte]string)
788 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
789 return err
790 }
791 if firstMode {
792 modeStr, modeParams := ch.modes.Format()
793
794 uc.forEachDownstream(func(dc *downstreamConn) {
795 params := []string{dc.nick, dc.marshalEntity(uc.network, channel), modeStr}
796 params = append(params, modeParams...)
797
798 dc.SendMessage(&irc.Message{
799 Prefix: dc.srv.prefix(),
800 Command: irc.RPL_CHANNELMODEIS,
801 Params: params,
802 })
803 })
804 }
805 case rpl_creationtime:
806 var channel, creationTime string
807 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
808 return err
809 }
810
811 ch, err := uc.getChannel(channel)
812 if err != nil {
813 return err
814 }
815
816 firstCreationTime := ch.creationTime == ""
817 ch.creationTime = creationTime
818 if firstCreationTime {
819 uc.forEachDownstream(func(dc *downstreamConn) {
820 dc.SendMessage(&irc.Message{
821 Prefix: dc.srv.prefix(),
822 Command: rpl_creationtime,
823 Params: []string{dc.nick, channel, creationTime},
824 })
825 })
826 }
827 case rpl_topicwhotime:
828 var name, who, timeStr string
829 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
830 return err
831 }
832 ch, err := uc.getChannel(name)
833 if err != nil {
834 return err
835 }
836 ch.TopicWho = who
837 sec, err := strconv.ParseInt(timeStr, 10, 64)
838 if err != nil {
839 return fmt.Errorf("failed to parse topic time: %v", err)
840 }
841 ch.TopicTime = time.Unix(sec, 0)
842 case irc.RPL_LIST:
843 var channel, clients, topic string
844 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
845 return err
846 }
847
848 pl := uc.getPendingLIST()
849 if pl == nil {
850 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
851 }
852
853 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
854 dc.SendMessage(&irc.Message{
855 Prefix: dc.srv.prefix(),
856 Command: irc.RPL_LIST,
857 Params: []string{dc.nick, dc.marshalEntity(uc.network, channel), clients, topic},
858 })
859 })
860 case irc.RPL_LISTEND:
861 ok := uc.endPendingLISTs(false)
862 if !ok {
863 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
864 }
865 case irc.RPL_NAMREPLY:
866 var name, statusStr, members string
867 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
868 return err
869 }
870
871 ch, ok := uc.channels[name]
872 if !ok {
873 // NAMES on a channel we have not joined, forward to downstream
874 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
875 channel := dc.marshalEntity(uc.network, name)
876 members := splitSpace(members)
877 for i, member := range members {
878 membership, nick := uc.parseMembershipPrefix(member)
879 members[i] = membership.String() + dc.marshalEntity(uc.network, nick)
880 }
881 memberStr := strings.Join(members, " ")
882
883 dc.SendMessage(&irc.Message{
884 Prefix: dc.srv.prefix(),
885 Command: irc.RPL_NAMREPLY,
886 Params: []string{dc.nick, statusStr, channel, memberStr},
887 })
888 })
889 return nil
890 }
891
892 status, err := parseChannelStatus(statusStr)
893 if err != nil {
894 return err
895 }
896 ch.Status = status
897
898 for _, s := range splitSpace(members) {
899 membership, nick := uc.parseMembershipPrefix(s)
900 ch.Members[nick] = membership
901 }
902 case irc.RPL_ENDOFNAMES:
903 var name string
904 if err := parseMessageParams(msg, nil, &name); err != nil {
905 return err
906 }
907
908 ch, ok := uc.channels[name]
909 if !ok {
910 // NAMES on a channel we have not joined, forward to downstream
911 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
912 channel := dc.marshalEntity(uc.network, name)
913
914 dc.SendMessage(&irc.Message{
915 Prefix: dc.srv.prefix(),
916 Command: irc.RPL_ENDOFNAMES,
917 Params: []string{dc.nick, channel, "End of /NAMES list"},
918 })
919 })
920 return nil
921 }
922
923 if ch.complete {
924 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
925 }
926 ch.complete = true
927
928 uc.forEachDownstream(func(dc *downstreamConn) {
929 forwardChannel(dc, ch)
930 })
931 case irc.RPL_WHOREPLY:
932 var channel, username, host, server, nick, mode, trailing string
933 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
934 return err
935 }
936
937 parts := strings.SplitN(trailing, " ", 2)
938 if len(parts) != 2 {
939 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
940 }
941 realname := parts[1]
942 hops, err := strconv.Atoi(parts[0])
943 if err != nil {
944 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
945 }
946 hops++
947
948 trailing = strconv.Itoa(hops) + " " + realname
949
950 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
951 channel := channel
952 if channel != "*" {
953 channel = dc.marshalEntity(uc.network, channel)
954 }
955 nick := dc.marshalEntity(uc.network, nick)
956 dc.SendMessage(&irc.Message{
957 Prefix: dc.srv.prefix(),
958 Command: irc.RPL_WHOREPLY,
959 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
960 })
961 })
962 case irc.RPL_ENDOFWHO:
963 var name string
964 if err := parseMessageParams(msg, nil, &name); err != nil {
965 return err
966 }
967
968 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
969 name := name
970 if name != "*" {
971 // TODO: support WHO masks
972 name = dc.marshalEntity(uc.network, name)
973 }
974 dc.SendMessage(&irc.Message{
975 Prefix: dc.srv.prefix(),
976 Command: irc.RPL_ENDOFWHO,
977 Params: []string{dc.nick, name, "End of /WHO list"},
978 })
979 })
980 case irc.RPL_WHOISUSER:
981 var nick, username, host, realname string
982 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
983 return err
984 }
985
986 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
987 nick := dc.marshalEntity(uc.network, nick)
988 dc.SendMessage(&irc.Message{
989 Prefix: dc.srv.prefix(),
990 Command: irc.RPL_WHOISUSER,
991 Params: []string{dc.nick, nick, username, host, "*", realname},
992 })
993 })
994 case irc.RPL_WHOISSERVER:
995 var nick, server, serverInfo string
996 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
997 return err
998 }
999
1000 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1001 nick := dc.marshalEntity(uc.network, nick)
1002 dc.SendMessage(&irc.Message{
1003 Prefix: dc.srv.prefix(),
1004 Command: irc.RPL_WHOISSERVER,
1005 Params: []string{dc.nick, nick, server, serverInfo},
1006 })
1007 })
1008 case irc.RPL_WHOISOPERATOR:
1009 var nick string
1010 if err := parseMessageParams(msg, nil, &nick); err != nil {
1011 return err
1012 }
1013
1014 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1015 nick := dc.marshalEntity(uc.network, nick)
1016 dc.SendMessage(&irc.Message{
1017 Prefix: dc.srv.prefix(),
1018 Command: irc.RPL_WHOISOPERATOR,
1019 Params: []string{dc.nick, nick, "is an IRC operator"},
1020 })
1021 })
1022 case irc.RPL_WHOISIDLE:
1023 var nick string
1024 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1025 return err
1026 }
1027
1028 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1029 nick := dc.marshalEntity(uc.network, nick)
1030 params := []string{dc.nick, nick}
1031 params = append(params, msg.Params[2:]...)
1032 dc.SendMessage(&irc.Message{
1033 Prefix: dc.srv.prefix(),
1034 Command: irc.RPL_WHOISIDLE,
1035 Params: params,
1036 })
1037 })
1038 case irc.RPL_WHOISCHANNELS:
1039 var nick, channelList string
1040 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1041 return err
1042 }
1043 channels := splitSpace(channelList)
1044
1045 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1046 nick := dc.marshalEntity(uc.network, nick)
1047 channelList := make([]string, len(channels))
1048 for i, channel := range channels {
1049 prefix, channel := uc.parseMembershipPrefix(channel)
1050 channel = dc.marshalEntity(uc.network, channel)
1051 channelList[i] = prefix.String() + channel
1052 }
1053 channels := strings.Join(channelList, " ")
1054 dc.SendMessage(&irc.Message{
1055 Prefix: dc.srv.prefix(),
1056 Command: irc.RPL_WHOISCHANNELS,
1057 Params: []string{dc.nick, nick, channels},
1058 })
1059 })
1060 case irc.RPL_ENDOFWHOIS:
1061 var nick string
1062 if err := parseMessageParams(msg, nil, &nick); err != nil {
1063 return err
1064 }
1065
1066 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1067 nick := dc.marshalEntity(uc.network, nick)
1068 dc.SendMessage(&irc.Message{
1069 Prefix: dc.srv.prefix(),
1070 Command: irc.RPL_ENDOFWHOIS,
1071 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1072 })
1073 })
1074 case "PRIVMSG":
1075 if msg.Prefix == nil {
1076 return fmt.Errorf("expected a prefix")
1077 }
1078
1079 var entity, text string
1080 if err := parseMessageParams(msg, &entity, &text); err != nil {
1081 return err
1082 }
1083
1084 if msg.Prefix.Name == serviceNick {
1085 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1086 break
1087 }
1088 if entity == serviceNick {
1089 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1090 break
1091 }
1092
1093 target := entity
1094 if target == uc.nick {
1095 target = msg.Prefix.Name
1096 }
1097 uc.produce(target, msg, nil)
1098 case "INVITE":
1099 var nick, channel string
1100 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1101 return err
1102 }
1103
1104 uc.forEachDownstream(func(dc *downstreamConn) {
1105 dc.SendMessage(&irc.Message{
1106 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1107 Command: "INVITE",
1108 Params: []string{dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1109 })
1110 })
1111 case irc.RPL_INVITING:
1112 var nick, channel string
1113 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1114 return err
1115 }
1116
1117 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1118 dc.SendMessage(&irc.Message{
1119 Prefix: dc.srv.prefix(),
1120 Command: irc.RPL_INVITING,
1121 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1122 })
1123 })
1124 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1125 var command, reason string
1126 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1127 return err
1128 }
1129
1130 if command == "LIST" {
1131 ok := uc.endPendingLISTs(false)
1132 if !ok {
1133 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1134 }
1135 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1136 dc.SendMessage(&irc.Message{
1137 Prefix: uc.srv.prefix(),
1138 Command: msg.Command,
1139 Params: []string{dc.nick, "LIST", reason},
1140 })
1141 })
1142 }
1143 case irc.RPL_AWAY:
1144 var nick, reason string
1145 if err := parseMessageParams(msg, nil, &nick, &reason); err != nil {
1146 return err
1147 }
1148
1149 uc.forEachDownstream(func(dc *downstreamConn) {
1150 dc.SendMessage(&irc.Message{
1151 Prefix: dc.srv.prefix(),
1152 Command: irc.RPL_AWAY,
1153 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), reason},
1154 })
1155 })
1156 case "AWAY":
1157 if msg.Prefix == nil {
1158 return fmt.Errorf("expected a prefix")
1159 }
1160
1161 uc.forEachDownstream(func(dc *downstreamConn) {
1162 if !dc.caps["away-notify"] {
1163 return
1164 }
1165 dc.SendMessage(&irc.Message{
1166 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1167 Command: "AWAY",
1168 Params: msg.Params,
1169 })
1170 })
1171 case "TAGMSG":
1172 // TODO: relay to downstream connections that accept message-tags
1173 case "ACK":
1174 // Ignore
1175 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1176 // Ignore
1177 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1178 // Ignore
1179 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1180 // Ignore
1181 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1182 // Ignore
1183 case irc.RPL_LISTSTART:
1184 // Ignore
1185 case rpl_localusers, rpl_globalusers:
1186 // Ignore
1187 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1188 // Ignore
1189 default:
1190 uc.logger.Printf("unhandled message: %v", msg)
1191 }
1192 return nil
1193}
1194
1195func splitSpace(s string) []string {
1196 return strings.FieldsFunc(s, func(r rune) bool {
1197 return r == ' '
1198 })
1199}
1200
1201func (uc *upstreamConn) register() {
1202 uc.nick = uc.network.Nick
1203 uc.username = uc.network.Username
1204 if uc.username == "" {
1205 uc.username = uc.nick
1206 }
1207 uc.realname = uc.network.Realname
1208 if uc.realname == "" {
1209 uc.realname = uc.nick
1210 }
1211
1212 uc.SendMessage(&irc.Message{
1213 Command: "CAP",
1214 Params: []string{"LS", "302"},
1215 })
1216
1217 if uc.network.Pass != "" {
1218 uc.SendMessage(&irc.Message{
1219 Command: "PASS",
1220 Params: []string{uc.network.Pass},
1221 })
1222 }
1223
1224 uc.SendMessage(&irc.Message{
1225 Command: "NICK",
1226 Params: []string{uc.nick},
1227 })
1228 uc.SendMessage(&irc.Message{
1229 Command: "USER",
1230 Params: []string{uc.username, "0", "*", uc.realname},
1231 })
1232}
1233
1234func (uc *upstreamConn) runUntilRegistered() error {
1235 for !uc.registered {
1236 msg, err := uc.ReadMessage()
1237 if err != nil {
1238 return fmt.Errorf("failed to read message: %v", err)
1239 }
1240
1241 if err := uc.handleMessage(msg); err != nil {
1242 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1243 }
1244 }
1245
1246 for _, command := range uc.network.ConnectCommands {
1247 m, err := irc.ParseMessage(command)
1248 if err != nil {
1249 uc.logger.Printf("failed to parse connect command %q: %v", command, err)
1250 } else {
1251 uc.SendMessage(m)
1252 }
1253 }
1254
1255 return nil
1256}
1257
1258func (uc *upstreamConn) requestSASL() bool {
1259 if uc.network.SASL.Mechanism == "" {
1260 return false
1261 }
1262
1263 v, ok := uc.supportedCaps["sasl"]
1264 if !ok {
1265 return false
1266 }
1267 if v != "" {
1268 mechanisms := strings.Split(v, ",")
1269 found := false
1270 for _, mech := range mechanisms {
1271 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1272 found = true
1273 break
1274 }
1275 }
1276 if !found {
1277 return false
1278 }
1279 }
1280
1281 return true
1282}
1283
1284func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1285 switch name {
1286 case "sasl":
1287 if !ok {
1288 uc.logger.Printf("server refused to acknowledge the SASL capability")
1289 return nil
1290 }
1291
1292 auth := &uc.network.SASL
1293 switch auth.Mechanism {
1294 case "PLAIN":
1295 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1296 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1297 default:
1298 return fmt.Errorf("unsupported SASL mechanism %q", name)
1299 }
1300
1301 uc.SendMessage(&irc.Message{
1302 Command: "AUTHENTICATE",
1303 Params: []string{auth.Mechanism},
1304 })
1305 case "message-tags":
1306 uc.tagsSupported = ok
1307 case "labeled-response":
1308 uc.labelsSupported = ok
1309 case "away-notify":
1310 uc.awayNotifySupported = ok
1311 case "batch", "server-time":
1312 // Nothing to do
1313 default:
1314 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
1315 }
1316 return nil
1317}
1318
1319func (uc *upstreamConn) readMessages(ch chan<- event) error {
1320 for {
1321 msg, err := uc.ReadMessage()
1322 if err == io.EOF {
1323 break
1324 } else if err != nil {
1325 return fmt.Errorf("failed to read IRC command: %v", err)
1326 }
1327
1328 ch <- eventUpstreamMessage{msg, uc}
1329 }
1330
1331 return nil
1332}
1333
1334func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1335 if uc.labelsSupported {
1336 if msg.Tags == nil {
1337 msg.Tags = make(map[string]irc.TagValue)
1338 }
1339 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1340 uc.nextLabelID++
1341 }
1342 uc.SendMessage(msg)
1343}
1344
1345// TODO: handle moving logs when a network name changes, when support for this is added
1346func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
1347 if uc.srv.LogPath == "" {
1348 return
1349 }
1350
1351 ml, ok := uc.messageLoggers[entity]
1352 if !ok {
1353 ml = newMessageLogger(uc.network, entity)
1354 uc.messageLoggers[entity] = ml
1355 }
1356
1357 if err := ml.Append(msg); err != nil {
1358 uc.logger.Printf("failed to log message: %v", err)
1359 }
1360}
1361
1362// appendHistory appends a message to the history. entity can be empty.
1363func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
1364 // If no client is offline, no need to append the message to the buffer
1365 if len(uc.network.offlineClients) == 0 {
1366 return
1367 }
1368
1369 history, ok := uc.network.history[entity]
1370 if !ok {
1371 history = &networkHistory{
1372 offlineClients: make(map[string]uint64),
1373 ring: NewRing(uc.srv.RingCap),
1374 }
1375 uc.network.history[entity] = history
1376
1377 for clientName, _ := range uc.network.offlineClients {
1378 history.offlineClients[clientName] = 0
1379 }
1380 }
1381
1382 history.ring.Produce(msg)
1383}
1384
1385// produce appends a message to the logs, adds it to the history and forwards
1386// it to connected downstream connections.
1387//
1388// If origin is not nil and origin doesn't support echo-message, the message is
1389// forwarded to all connections except origin.
1390func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) {
1391 if target != "" {
1392 uc.appendLog(target, msg)
1393 }
1394
1395 uc.appendHistory(target, msg)
1396
1397 uc.forEachDownstream(func(dc *downstreamConn) {
1398 if dc != origin || dc.caps["echo-message"] {
1399 dc.SendMessage(dc.marshalMessage(msg, uc.network))
1400 }
1401 })
1402}
1403
1404func (uc *upstreamConn) updateAway() {
1405 away := true
1406 uc.forEachDownstream(func(*downstreamConn) {
1407 away = false
1408 })
1409 if away == uc.away {
1410 return
1411 }
1412 if away {
1413 uc.SendMessage(&irc.Message{
1414 Command: "AWAY",
1415 Params: []string{"Auto away"},
1416 })
1417 } else {
1418 uc.SendMessage(&irc.Message{
1419 Command: "AWAY",
1420 })
1421 }
1422 uc.away = away
1423}
Note: See TracBrowser for help on using the repository browser.