source: code/trunk/upstream.go@ 280

Last change on this file since 280 was 278, checked in by contact, 5 years ago

Add upstreamConn.caps

Instead of adding one field per capability, let's just have a map, just
like downstreamConn.

File size: 35.0 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "gopkg.in/irc.v3"
16)
17
18type upstreamChannel struct {
19 Name string
20 conn *upstreamConn
21 Topic string
22 TopicWho string
23 TopicTime time.Time
24 Status channelStatus
25 modes channelModes
26 creationTime string
27 Members map[string]*membership
28 complete bool
29}
30
31type upstreamConn struct {
32 conn
33
34 network *network
35 user *user
36
37 serverName string
38 availableUserModes string
39 availableChannelModes map[byte]channelModeType
40 availableChannelTypes string
41 availableMemberships []membership
42
43 registered bool
44 nick string
45 username string
46 realname string
47 modes userModes
48 channels map[string]*upstreamChannel
49 supportedCaps map[string]string
50 caps map[string]bool
51 batches map[string]batch
52 away bool
53 nextLabelID uint64
54
55 saslClient sasl.Client
56 saslStarted bool
57
58 // set of LIST commands in progress, per downstream
59 pendingLISTDownstreamSet map[uint64]struct{}
60
61 messageLoggers map[string]*messageLogger
62}
63
64func connectToUpstream(network *network) (*upstreamConn, error) {
65 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
66
67 var scheme string
68 var addr string
69
70 addrParts := strings.SplitN(network.Addr, "://", 2)
71 if len(addrParts) == 2 {
72 scheme = addrParts[0]
73 addr = addrParts[1]
74 } else {
75 scheme = "ircs"
76 addr = addrParts[0]
77 }
78
79 dialer := net.Dialer{Timeout: connectTimeout}
80
81 var netConn net.Conn
82 var err error
83 switch scheme {
84 case "ircs":
85 if !strings.ContainsRune(addr, ':') {
86 addr = addr + ":6697"
87 }
88
89 logger.Printf("connecting to TLS server at address %q", addr)
90 netConn, err = tls.DialWithDialer(&dialer, "tcp", addr, nil)
91 case "irc+insecure":
92 if !strings.ContainsRune(addr, ':') {
93 addr = addr + ":6667"
94 }
95
96 logger.Printf("connecting to plain-text server at address %q", addr)
97 netConn, err = dialer.Dial("tcp", addr)
98 default:
99 return nil, fmt.Errorf("failed to dial %q: unknown scheme: %v", addr, scheme)
100 }
101 if err != nil {
102 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
103 }
104
105 uc := &upstreamConn{
106 conn: *newConn(network.user.srv, netConn, logger),
107 network: network,
108 user: network.user,
109 channels: make(map[string]*upstreamChannel),
110 supportedCaps: make(map[string]string),
111 caps: make(map[string]bool),
112 batches: make(map[string]batch),
113 availableChannelTypes: stdChannelTypes,
114 availableChannelModes: stdChannelModes,
115 availableMemberships: stdMemberships,
116 pendingLISTDownstreamSet: make(map[uint64]struct{}),
117 messageLoggers: make(map[string]*messageLogger),
118 }
119
120 return uc, nil
121}
122
123func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
124 uc.network.forEachDownstream(f)
125}
126
127func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
128 uc.forEachDownstream(func(dc *downstreamConn) {
129 if id != 0 && id != dc.id {
130 return
131 }
132 f(dc)
133 })
134}
135
136func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
137 ch, ok := uc.channels[name]
138 if !ok {
139 return nil, fmt.Errorf("unknown channel %q", name)
140 }
141 return ch, nil
142}
143
144func (uc *upstreamConn) isChannel(entity string) bool {
145 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
146 return true
147 }
148 return false
149}
150
151func (uc *upstreamConn) getPendingLIST() *pendingLIST {
152 for _, pl := range uc.user.pendingLISTs {
153 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
154 continue
155 }
156 return &pl
157 }
158 return nil
159}
160
161func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
162 found = false
163 for i := 0; i < len(uc.user.pendingLISTs); i++ {
164 pl := uc.user.pendingLISTs[i]
165 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
166 continue
167 }
168 delete(pl.pendingCommands, uc.network.ID)
169 if len(pl.pendingCommands) == 0 {
170 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
171 i--
172 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
173 dc.SendMessage(&irc.Message{
174 Prefix: dc.srv.prefix(),
175 Command: irc.RPL_LISTEND,
176 Params: []string{dc.nick, "End of /LIST"},
177 })
178 })
179 }
180 found = true
181 if !all {
182 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
183 uc.user.forEachUpstream(func(uc *upstreamConn) {
184 uc.trySendLIST(pl.downstreamID)
185 })
186 return
187 }
188 }
189 return
190}
191
192func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
193 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
194 // a LIST command is already pending
195 // we will try again when that command is completed
196 return
197 }
198
199 for _, pl := range uc.user.pendingLISTs {
200 if pl.downstreamID != downstreamID {
201 continue
202 }
203 // this is the first pending LIST command list of the downstream
204 listCommand, ok := pl.pendingCommands[uc.network.ID]
205 if !ok {
206 // there is no command for this upstream in these LIST commands
207 // do not send anything
208 continue
209 }
210 // there is a command for this upstream in these LIST commands
211 // send it now
212
213 uc.SendMessageLabeled(downstreamID, listCommand)
214
215 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
216 return
217 }
218}
219
220func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
221 for _, m := range uc.availableMemberships {
222 if m.Prefix == s[0] {
223 return &m, s[1:]
224 }
225 }
226 return nil, s
227}
228
229func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
230 var label string
231 if l, ok := msg.GetTag("label"); ok {
232 label = l
233 }
234
235 var msgBatch *batch
236 if batchName, ok := msg.GetTag("batch"); ok {
237 b, ok := uc.batches[batchName]
238 if !ok {
239 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
240 }
241 msgBatch = &b
242 if label == "" {
243 label = msgBatch.Label
244 }
245 }
246
247 var downstreamID uint64 = 0
248 if label != "" {
249 var labelOffset uint64
250 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
251 if err == nil && n < 2 {
252 err = errors.New("not enough arguments")
253 }
254 if err != nil {
255 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
256 }
257 }
258
259 if _, ok := msg.Tags["time"]; !ok {
260 msg.Tags["time"] = irc.TagValue(time.Now().UTC().Format(serverTimeLayout))
261 }
262
263 switch msg.Command {
264 case "PING":
265 uc.SendMessage(&irc.Message{
266 Command: "PONG",
267 Params: msg.Params,
268 })
269 return nil
270 case "NOTICE":
271 if msg.Prefix == nil {
272 return fmt.Errorf("expected a prefix")
273 }
274
275 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
276 uc.produce("", msg, nil)
277 } else { // regular user NOTICE
278 var entity, text string
279 if err := parseMessageParams(msg, &entity, &text); err != nil {
280 return err
281 }
282
283 target := entity
284 if target == uc.nick {
285 target = msg.Prefix.Name
286 }
287 uc.produce(target, msg, nil)
288 }
289 case "CAP":
290 var subCmd string
291 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
292 return err
293 }
294 subCmd = strings.ToUpper(subCmd)
295 subParams := msg.Params[2:]
296 switch subCmd {
297 case "LS":
298 if len(subParams) < 1 {
299 return newNeedMoreParamsError(msg.Command)
300 }
301 caps := strings.Fields(subParams[len(subParams)-1])
302 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
303
304 for _, s := range caps {
305 kv := strings.SplitN(s, "=", 2)
306 k := strings.ToLower(kv[0])
307 var v string
308 if len(kv) == 2 {
309 v = kv[1]
310 }
311 uc.supportedCaps[k] = v
312 }
313
314 if more {
315 break // wait to receive all capabilities
316 }
317
318 requestCaps := make([]string, 0, 16)
319 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time", "away-notify"} {
320 if _, ok := uc.supportedCaps[c]; ok {
321 requestCaps = append(requestCaps, c)
322 }
323 }
324
325 if uc.requestSASL() {
326 requestCaps = append(requestCaps, "sasl")
327 }
328
329 if len(requestCaps) > 0 {
330 uc.SendMessage(&irc.Message{
331 Command: "CAP",
332 Params: []string{"REQ", strings.Join(requestCaps, " ")},
333 })
334 }
335
336 if uc.requestSASL() {
337 break // we'll send CAP END after authentication is completed
338 }
339
340 uc.SendMessage(&irc.Message{
341 Command: "CAP",
342 Params: []string{"END"},
343 })
344 case "ACK", "NAK":
345 if len(subParams) < 1 {
346 return newNeedMoreParamsError(msg.Command)
347 }
348 caps := strings.Fields(subParams[0])
349
350 for _, name := range caps {
351 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
352 return err
353 }
354 }
355
356 if uc.saslClient == nil {
357 uc.SendMessage(&irc.Message{
358 Command: "CAP",
359 Params: []string{"END"},
360 })
361 }
362 default:
363 uc.logger.Printf("unhandled message: %v", msg)
364 }
365 case "AUTHENTICATE":
366 if uc.saslClient == nil {
367 return fmt.Errorf("received unexpected AUTHENTICATE message")
368 }
369
370 // TODO: if a challenge is 400 bytes long, buffer it
371 var challengeStr string
372 if err := parseMessageParams(msg, &challengeStr); err != nil {
373 uc.SendMessage(&irc.Message{
374 Command: "AUTHENTICATE",
375 Params: []string{"*"},
376 })
377 return err
378 }
379
380 var challenge []byte
381 if challengeStr != "+" {
382 var err error
383 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
384 if err != nil {
385 uc.SendMessage(&irc.Message{
386 Command: "AUTHENTICATE",
387 Params: []string{"*"},
388 })
389 return err
390 }
391 }
392
393 var resp []byte
394 var err error
395 if !uc.saslStarted {
396 _, resp, err = uc.saslClient.Start()
397 uc.saslStarted = true
398 } else {
399 resp, err = uc.saslClient.Next(challenge)
400 }
401 if err != nil {
402 uc.SendMessage(&irc.Message{
403 Command: "AUTHENTICATE",
404 Params: []string{"*"},
405 })
406 return err
407 }
408
409 // TODO: send response in multiple chunks if >= 400 bytes
410 var respStr = "+"
411 if resp != nil {
412 respStr = base64.StdEncoding.EncodeToString(resp)
413 }
414
415 uc.SendMessage(&irc.Message{
416 Command: "AUTHENTICATE",
417 Params: []string{respStr},
418 })
419 case irc.RPL_LOGGEDIN:
420 var account string
421 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
422 return err
423 }
424 uc.logger.Printf("logged in with account %q", account)
425 case irc.RPL_LOGGEDOUT:
426 uc.logger.Printf("logged out")
427 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
428 var info string
429 if err := parseMessageParams(msg, nil, &info); err != nil {
430 return err
431 }
432 switch msg.Command {
433 case irc.ERR_NICKLOCKED:
434 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
435 case irc.ERR_SASLFAIL:
436 uc.logger.Printf("SASL authentication failed: %v", info)
437 case irc.ERR_SASLTOOLONG:
438 uc.logger.Printf("SASL message too long: %v", info)
439 }
440
441 uc.saslClient = nil
442 uc.saslStarted = false
443
444 uc.SendMessage(&irc.Message{
445 Command: "CAP",
446 Params: []string{"END"},
447 })
448 case irc.RPL_WELCOME:
449 uc.registered = true
450 uc.logger.Printf("connection registered")
451
452 uc.forEachDownstream(func(dc *downstreamConn) {
453 dc.updateSupportedCaps()
454 })
455
456 for _, ch := range uc.network.channels {
457 params := []string{ch.Name}
458 if ch.Key != "" {
459 params = append(params, ch.Key)
460 }
461 uc.SendMessage(&irc.Message{
462 Command: "JOIN",
463 Params: params,
464 })
465 }
466 case irc.RPL_MYINFO:
467 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
468 return err
469 }
470 case irc.RPL_ISUPPORT:
471 if err := parseMessageParams(msg, nil, nil); err != nil {
472 return err
473 }
474 for _, token := range msg.Params[1 : len(msg.Params)-1] {
475 negate := false
476 parameter := token
477 value := ""
478 if strings.HasPrefix(token, "-") {
479 negate = true
480 token = token[1:]
481 } else {
482 if i := strings.IndexByte(token, '='); i >= 0 {
483 parameter = token[:i]
484 value = token[i+1:]
485 }
486 }
487 if !negate {
488 switch parameter {
489 case "CHANMODES":
490 parts := strings.SplitN(value, ",", 5)
491 if len(parts) < 4 {
492 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
493 }
494 modes := make(map[byte]channelModeType)
495 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
496 for j := 0; j < len(parts[i]); j++ {
497 mode := parts[i][j]
498 modes[mode] = mt
499 }
500 }
501 uc.availableChannelModes = modes
502 case "CHANTYPES":
503 uc.availableChannelTypes = value
504 case "PREFIX":
505 if value == "" {
506 uc.availableMemberships = nil
507 } else {
508 if value[0] != '(' {
509 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
510 }
511 sep := strings.IndexByte(value, ')')
512 if sep < 0 || len(value) != sep*2 {
513 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
514 }
515 memberships := make([]membership, len(value)/2-1)
516 for i := range memberships {
517 memberships[i] = membership{
518 Mode: value[i+1],
519 Prefix: value[sep+i+1],
520 }
521 }
522 uc.availableMemberships = memberships
523 }
524 }
525 } else {
526 // TODO: handle ISUPPORT negations
527 }
528 }
529 case "BATCH":
530 var tag string
531 if err := parseMessageParams(msg, &tag); err != nil {
532 return err
533 }
534
535 if strings.HasPrefix(tag, "+") {
536 tag = tag[1:]
537 if _, ok := uc.batches[tag]; ok {
538 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
539 }
540 var batchType string
541 if err := parseMessageParams(msg, nil, &batchType); err != nil {
542 return err
543 }
544 label := label
545 if label == "" && msgBatch != nil {
546 label = msgBatch.Label
547 }
548 uc.batches[tag] = batch{
549 Type: batchType,
550 Params: msg.Params[2:],
551 Outer: msgBatch,
552 Label: label,
553 }
554 } else if strings.HasPrefix(tag, "-") {
555 tag = tag[1:]
556 if _, ok := uc.batches[tag]; !ok {
557 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
558 }
559 delete(uc.batches, tag)
560 } else {
561 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
562 }
563 case "NICK":
564 if msg.Prefix == nil {
565 return fmt.Errorf("expected a prefix")
566 }
567
568 var newNick string
569 if err := parseMessageParams(msg, &newNick); err != nil {
570 return err
571 }
572
573 me := false
574 if msg.Prefix.Name == uc.nick {
575 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
576 me = true
577 uc.nick = newNick
578 }
579
580 for _, ch := range uc.channels {
581 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
582 delete(ch.Members, msg.Prefix.Name)
583 ch.Members[newNick] = membership
584 uc.appendLog(ch.Name, msg)
585 uc.appendHistory(ch.Name, msg)
586 }
587 }
588
589 if !me {
590 uc.forEachDownstream(func(dc *downstreamConn) {
591 dc.SendMessage(dc.marshalMessage(msg, uc.network))
592 })
593 }
594 case "JOIN":
595 if msg.Prefix == nil {
596 return fmt.Errorf("expected a prefix")
597 }
598
599 var channels string
600 if err := parseMessageParams(msg, &channels); err != nil {
601 return err
602 }
603
604 for _, ch := range strings.Split(channels, ",") {
605 if msg.Prefix.Name == uc.nick {
606 uc.logger.Printf("joined channel %q", ch)
607 uc.channels[ch] = &upstreamChannel{
608 Name: ch,
609 conn: uc,
610 Members: make(map[string]*membership),
611 }
612
613 uc.SendMessage(&irc.Message{
614 Command: "MODE",
615 Params: []string{ch},
616 })
617 } else {
618 ch, err := uc.getChannel(ch)
619 if err != nil {
620 return err
621 }
622 ch.Members[msg.Prefix.Name] = nil
623 }
624
625 chMsg := msg.Copy()
626 chMsg.Params[0] = ch
627 uc.produce(ch, chMsg, nil)
628 }
629 case "PART":
630 if msg.Prefix == nil {
631 return fmt.Errorf("expected a prefix")
632 }
633
634 var channels string
635 if err := parseMessageParams(msg, &channels); err != nil {
636 return err
637 }
638
639 for _, ch := range strings.Split(channels, ",") {
640 if msg.Prefix.Name == uc.nick {
641 uc.logger.Printf("parted channel %q", ch)
642 delete(uc.channels, ch)
643 } else {
644 ch, err := uc.getChannel(ch)
645 if err != nil {
646 return err
647 }
648 delete(ch.Members, msg.Prefix.Name)
649 }
650
651 chMsg := msg.Copy()
652 chMsg.Params[0] = ch
653 uc.produce(ch, chMsg, nil)
654 }
655 case "KICK":
656 if msg.Prefix == nil {
657 return fmt.Errorf("expected a prefix")
658 }
659
660 var channel, user string
661 if err := parseMessageParams(msg, &channel, &user); err != nil {
662 return err
663 }
664
665 if user == uc.nick {
666 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
667 delete(uc.channels, channel)
668 } else {
669 ch, err := uc.getChannel(channel)
670 if err != nil {
671 return err
672 }
673 delete(ch.Members, user)
674 }
675
676 uc.produce(channel, msg, nil)
677 case "QUIT":
678 if msg.Prefix == nil {
679 return fmt.Errorf("expected a prefix")
680 }
681
682 if msg.Prefix.Name == uc.nick {
683 uc.logger.Printf("quit")
684 }
685
686 for _, ch := range uc.channels {
687 if _, ok := ch.Members[msg.Prefix.Name]; ok {
688 delete(ch.Members, msg.Prefix.Name)
689
690 uc.appendLog(ch.Name, msg)
691 uc.appendHistory(ch.Name, msg)
692 }
693 }
694
695 if msg.Prefix.Name != uc.nick {
696 uc.forEachDownstream(func(dc *downstreamConn) {
697 dc.SendMessage(dc.marshalMessage(msg, uc.network))
698 })
699 }
700 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
701 var name, topic string
702 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
703 return err
704 }
705 ch, err := uc.getChannel(name)
706 if err != nil {
707 return err
708 }
709 if msg.Command == irc.RPL_TOPIC {
710 ch.Topic = topic
711 } else {
712 ch.Topic = ""
713 }
714 case "TOPIC":
715 var name string
716 if err := parseMessageParams(msg, &name); err != nil {
717 return err
718 }
719 ch, err := uc.getChannel(name)
720 if err != nil {
721 return err
722 }
723 if len(msg.Params) > 1 {
724 ch.Topic = msg.Params[1]
725 } else {
726 ch.Topic = ""
727 }
728 uc.produce(ch.Name, msg, nil)
729 case "MODE":
730 var name, modeStr string
731 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
732 return err
733 }
734
735 if !uc.isChannel(name) { // user mode change
736 if name != uc.nick {
737 return fmt.Errorf("received MODE message for unknown nick %q", name)
738 }
739 return uc.modes.Apply(modeStr)
740 // TODO: notify downstreams about user mode change?
741 } else { // channel mode change
742 ch, err := uc.getChannel(name)
743 if err != nil {
744 return err
745 }
746
747 if ch.modes != nil {
748 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
749 return err
750 }
751 }
752
753 uc.produce(ch.Name, msg, nil)
754 }
755 case irc.RPL_UMODEIS:
756 if err := parseMessageParams(msg, nil); err != nil {
757 return err
758 }
759 modeStr := ""
760 if len(msg.Params) > 1 {
761 modeStr = msg.Params[1]
762 }
763
764 uc.modes = ""
765 if err := uc.modes.Apply(modeStr); err != nil {
766 return err
767 }
768 // TODO: send RPL_UMODEIS to downstream connections when applicable
769 case irc.RPL_CHANNELMODEIS:
770 var channel string
771 if err := parseMessageParams(msg, nil, &channel); err != nil {
772 return err
773 }
774 modeStr := ""
775 if len(msg.Params) > 2 {
776 modeStr = msg.Params[2]
777 }
778
779 ch, err := uc.getChannel(channel)
780 if err != nil {
781 return err
782 }
783
784 firstMode := ch.modes == nil
785 ch.modes = make(map[byte]string)
786 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
787 return err
788 }
789 if firstMode {
790 modeStr, modeParams := ch.modes.Format()
791
792 uc.forEachDownstream(func(dc *downstreamConn) {
793 params := []string{dc.nick, dc.marshalEntity(uc.network, channel), modeStr}
794 params = append(params, modeParams...)
795
796 dc.SendMessage(&irc.Message{
797 Prefix: dc.srv.prefix(),
798 Command: irc.RPL_CHANNELMODEIS,
799 Params: params,
800 })
801 })
802 }
803 case rpl_creationtime:
804 var channel, creationTime string
805 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
806 return err
807 }
808
809 ch, err := uc.getChannel(channel)
810 if err != nil {
811 return err
812 }
813
814 firstCreationTime := ch.creationTime == ""
815 ch.creationTime = creationTime
816 if firstCreationTime {
817 uc.forEachDownstream(func(dc *downstreamConn) {
818 dc.SendMessage(&irc.Message{
819 Prefix: dc.srv.prefix(),
820 Command: rpl_creationtime,
821 Params: []string{dc.nick, channel, creationTime},
822 })
823 })
824 }
825 case rpl_topicwhotime:
826 var name, who, timeStr string
827 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
828 return err
829 }
830 ch, err := uc.getChannel(name)
831 if err != nil {
832 return err
833 }
834 ch.TopicWho = who
835 sec, err := strconv.ParseInt(timeStr, 10, 64)
836 if err != nil {
837 return fmt.Errorf("failed to parse topic time: %v", err)
838 }
839 ch.TopicTime = time.Unix(sec, 0)
840 case irc.RPL_LIST:
841 var channel, clients, topic string
842 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
843 return err
844 }
845
846 pl := uc.getPendingLIST()
847 if pl == nil {
848 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
849 }
850
851 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
852 dc.SendMessage(&irc.Message{
853 Prefix: dc.srv.prefix(),
854 Command: irc.RPL_LIST,
855 Params: []string{dc.nick, dc.marshalEntity(uc.network, channel), clients, topic},
856 })
857 })
858 case irc.RPL_LISTEND:
859 ok := uc.endPendingLISTs(false)
860 if !ok {
861 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
862 }
863 case irc.RPL_NAMREPLY:
864 var name, statusStr, members string
865 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
866 return err
867 }
868
869 ch, ok := uc.channels[name]
870 if !ok {
871 // NAMES on a channel we have not joined, forward to downstream
872 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
873 channel := dc.marshalEntity(uc.network, name)
874 members := splitSpace(members)
875 for i, member := range members {
876 membership, nick := uc.parseMembershipPrefix(member)
877 members[i] = membership.String() + dc.marshalEntity(uc.network, nick)
878 }
879 memberStr := strings.Join(members, " ")
880
881 dc.SendMessage(&irc.Message{
882 Prefix: dc.srv.prefix(),
883 Command: irc.RPL_NAMREPLY,
884 Params: []string{dc.nick, statusStr, channel, memberStr},
885 })
886 })
887 return nil
888 }
889
890 status, err := parseChannelStatus(statusStr)
891 if err != nil {
892 return err
893 }
894 ch.Status = status
895
896 for _, s := range splitSpace(members) {
897 membership, nick := uc.parseMembershipPrefix(s)
898 ch.Members[nick] = membership
899 }
900 case irc.RPL_ENDOFNAMES:
901 var name string
902 if err := parseMessageParams(msg, nil, &name); err != nil {
903 return err
904 }
905
906 ch, ok := uc.channels[name]
907 if !ok {
908 // NAMES on a channel we have not joined, forward to downstream
909 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
910 channel := dc.marshalEntity(uc.network, name)
911
912 dc.SendMessage(&irc.Message{
913 Prefix: dc.srv.prefix(),
914 Command: irc.RPL_ENDOFNAMES,
915 Params: []string{dc.nick, channel, "End of /NAMES list"},
916 })
917 })
918 return nil
919 }
920
921 if ch.complete {
922 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
923 }
924 ch.complete = true
925
926 uc.forEachDownstream(func(dc *downstreamConn) {
927 forwardChannel(dc, ch)
928 })
929 case irc.RPL_WHOREPLY:
930 var channel, username, host, server, nick, mode, trailing string
931 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
932 return err
933 }
934
935 parts := strings.SplitN(trailing, " ", 2)
936 if len(parts) != 2 {
937 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
938 }
939 realname := parts[1]
940 hops, err := strconv.Atoi(parts[0])
941 if err != nil {
942 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
943 }
944 hops++
945
946 trailing = strconv.Itoa(hops) + " " + realname
947
948 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
949 channel := channel
950 if channel != "*" {
951 channel = dc.marshalEntity(uc.network, channel)
952 }
953 nick := dc.marshalEntity(uc.network, nick)
954 dc.SendMessage(&irc.Message{
955 Prefix: dc.srv.prefix(),
956 Command: irc.RPL_WHOREPLY,
957 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
958 })
959 })
960 case irc.RPL_ENDOFWHO:
961 var name string
962 if err := parseMessageParams(msg, nil, &name); err != nil {
963 return err
964 }
965
966 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
967 name := name
968 if name != "*" {
969 // TODO: support WHO masks
970 name = dc.marshalEntity(uc.network, name)
971 }
972 dc.SendMessage(&irc.Message{
973 Prefix: dc.srv.prefix(),
974 Command: irc.RPL_ENDOFWHO,
975 Params: []string{dc.nick, name, "End of /WHO list"},
976 })
977 })
978 case irc.RPL_WHOISUSER:
979 var nick, username, host, realname string
980 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
981 return err
982 }
983
984 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
985 nick := dc.marshalEntity(uc.network, nick)
986 dc.SendMessage(&irc.Message{
987 Prefix: dc.srv.prefix(),
988 Command: irc.RPL_WHOISUSER,
989 Params: []string{dc.nick, nick, username, host, "*", realname},
990 })
991 })
992 case irc.RPL_WHOISSERVER:
993 var nick, server, serverInfo string
994 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
995 return err
996 }
997
998 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
999 nick := dc.marshalEntity(uc.network, nick)
1000 dc.SendMessage(&irc.Message{
1001 Prefix: dc.srv.prefix(),
1002 Command: irc.RPL_WHOISSERVER,
1003 Params: []string{dc.nick, nick, server, serverInfo},
1004 })
1005 })
1006 case irc.RPL_WHOISOPERATOR:
1007 var nick string
1008 if err := parseMessageParams(msg, nil, &nick); err != nil {
1009 return err
1010 }
1011
1012 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1013 nick := dc.marshalEntity(uc.network, nick)
1014 dc.SendMessage(&irc.Message{
1015 Prefix: dc.srv.prefix(),
1016 Command: irc.RPL_WHOISOPERATOR,
1017 Params: []string{dc.nick, nick, "is an IRC operator"},
1018 })
1019 })
1020 case irc.RPL_WHOISIDLE:
1021 var nick string
1022 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1023 return err
1024 }
1025
1026 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1027 nick := dc.marshalEntity(uc.network, nick)
1028 params := []string{dc.nick, nick}
1029 params = append(params, msg.Params[2:]...)
1030 dc.SendMessage(&irc.Message{
1031 Prefix: dc.srv.prefix(),
1032 Command: irc.RPL_WHOISIDLE,
1033 Params: params,
1034 })
1035 })
1036 case irc.RPL_WHOISCHANNELS:
1037 var nick, channelList string
1038 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1039 return err
1040 }
1041 channels := splitSpace(channelList)
1042
1043 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1044 nick := dc.marshalEntity(uc.network, nick)
1045 channelList := make([]string, len(channels))
1046 for i, channel := range channels {
1047 prefix, channel := uc.parseMembershipPrefix(channel)
1048 channel = dc.marshalEntity(uc.network, channel)
1049 channelList[i] = prefix.String() + channel
1050 }
1051 channels := strings.Join(channelList, " ")
1052 dc.SendMessage(&irc.Message{
1053 Prefix: dc.srv.prefix(),
1054 Command: irc.RPL_WHOISCHANNELS,
1055 Params: []string{dc.nick, nick, channels},
1056 })
1057 })
1058 case irc.RPL_ENDOFWHOIS:
1059 var nick string
1060 if err := parseMessageParams(msg, nil, &nick); err != nil {
1061 return err
1062 }
1063
1064 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1065 nick := dc.marshalEntity(uc.network, nick)
1066 dc.SendMessage(&irc.Message{
1067 Prefix: dc.srv.prefix(),
1068 Command: irc.RPL_ENDOFWHOIS,
1069 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1070 })
1071 })
1072 case "PRIVMSG":
1073 if msg.Prefix == nil {
1074 return fmt.Errorf("expected a prefix")
1075 }
1076
1077 var entity, text string
1078 if err := parseMessageParams(msg, &entity, &text); err != nil {
1079 return err
1080 }
1081
1082 if msg.Prefix.Name == serviceNick {
1083 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1084 break
1085 }
1086 if entity == serviceNick {
1087 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1088 break
1089 }
1090
1091 target := entity
1092 if target == uc.nick {
1093 target = msg.Prefix.Name
1094 }
1095 uc.produce(target, msg, nil)
1096 case "INVITE":
1097 var nick, channel string
1098 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1099 return err
1100 }
1101
1102 uc.forEachDownstream(func(dc *downstreamConn) {
1103 dc.SendMessage(&irc.Message{
1104 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1105 Command: "INVITE",
1106 Params: []string{dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1107 })
1108 })
1109 case irc.RPL_INVITING:
1110 var nick, channel string
1111 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1112 return err
1113 }
1114
1115 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1116 dc.SendMessage(&irc.Message{
1117 Prefix: dc.srv.prefix(),
1118 Command: irc.RPL_INVITING,
1119 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1120 })
1121 })
1122 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1123 var command, reason string
1124 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1125 return err
1126 }
1127
1128 if command == "LIST" {
1129 ok := uc.endPendingLISTs(false)
1130 if !ok {
1131 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1132 }
1133 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1134 dc.SendMessage(&irc.Message{
1135 Prefix: uc.srv.prefix(),
1136 Command: msg.Command,
1137 Params: []string{dc.nick, "LIST", reason},
1138 })
1139 })
1140 }
1141 case irc.RPL_AWAY:
1142 var nick, reason string
1143 if err := parseMessageParams(msg, nil, &nick, &reason); err != nil {
1144 return err
1145 }
1146
1147 uc.forEachDownstream(func(dc *downstreamConn) {
1148 dc.SendMessage(&irc.Message{
1149 Prefix: dc.srv.prefix(),
1150 Command: irc.RPL_AWAY,
1151 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), reason},
1152 })
1153 })
1154 case "AWAY":
1155 if msg.Prefix == nil {
1156 return fmt.Errorf("expected a prefix")
1157 }
1158
1159 uc.forEachDownstream(func(dc *downstreamConn) {
1160 if !dc.caps["away-notify"] {
1161 return
1162 }
1163 dc.SendMessage(&irc.Message{
1164 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1165 Command: "AWAY",
1166 Params: msg.Params,
1167 })
1168 })
1169 case "TAGMSG":
1170 // TODO: relay to downstream connections that accept message-tags
1171 case "ACK":
1172 // Ignore
1173 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1174 // Ignore
1175 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1176 // Ignore
1177 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1178 // Ignore
1179 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1180 // Ignore
1181 case irc.RPL_LISTSTART:
1182 // Ignore
1183 case rpl_localusers, rpl_globalusers:
1184 // Ignore
1185 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1186 // Ignore
1187 default:
1188 uc.logger.Printf("unhandled message: %v", msg)
1189 }
1190 return nil
1191}
1192
1193func splitSpace(s string) []string {
1194 return strings.FieldsFunc(s, func(r rune) bool {
1195 return r == ' '
1196 })
1197}
1198
1199func (uc *upstreamConn) register() {
1200 uc.nick = uc.network.Nick
1201 uc.username = uc.network.Username
1202 if uc.username == "" {
1203 uc.username = uc.nick
1204 }
1205 uc.realname = uc.network.Realname
1206 if uc.realname == "" {
1207 uc.realname = uc.nick
1208 }
1209
1210 uc.SendMessage(&irc.Message{
1211 Command: "CAP",
1212 Params: []string{"LS", "302"},
1213 })
1214
1215 if uc.network.Pass != "" {
1216 uc.SendMessage(&irc.Message{
1217 Command: "PASS",
1218 Params: []string{uc.network.Pass},
1219 })
1220 }
1221
1222 uc.SendMessage(&irc.Message{
1223 Command: "NICK",
1224 Params: []string{uc.nick},
1225 })
1226 uc.SendMessage(&irc.Message{
1227 Command: "USER",
1228 Params: []string{uc.username, "0", "*", uc.realname},
1229 })
1230}
1231
1232func (uc *upstreamConn) runUntilRegistered() error {
1233 for !uc.registered {
1234 msg, err := uc.ReadMessage()
1235 if err != nil {
1236 return fmt.Errorf("failed to read message: %v", err)
1237 }
1238
1239 if err := uc.handleMessage(msg); err != nil {
1240 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1241 }
1242 }
1243
1244 for _, command := range uc.network.ConnectCommands {
1245 m, err := irc.ParseMessage(command)
1246 if err != nil {
1247 uc.logger.Printf("failed to parse connect command %q: %v", command, err)
1248 } else {
1249 uc.SendMessage(m)
1250 }
1251 }
1252
1253 return nil
1254}
1255
1256func (uc *upstreamConn) requestSASL() bool {
1257 if uc.network.SASL.Mechanism == "" {
1258 return false
1259 }
1260
1261 v, ok := uc.supportedCaps["sasl"]
1262 if !ok {
1263 return false
1264 }
1265 if v != "" {
1266 mechanisms := strings.Split(v, ",")
1267 found := false
1268 for _, mech := range mechanisms {
1269 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1270 found = true
1271 break
1272 }
1273 }
1274 if !found {
1275 return false
1276 }
1277 }
1278
1279 return true
1280}
1281
1282func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1283 uc.caps[name] = ok
1284
1285 switch name {
1286 case "sasl":
1287 if !ok {
1288 uc.logger.Printf("server refused to acknowledge the SASL capability")
1289 return nil
1290 }
1291
1292 auth := &uc.network.SASL
1293 switch auth.Mechanism {
1294 case "PLAIN":
1295 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1296 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1297 default:
1298 return fmt.Errorf("unsupported SASL mechanism %q", name)
1299 }
1300
1301 uc.SendMessage(&irc.Message{
1302 Command: "AUTHENTICATE",
1303 Params: []string{auth.Mechanism},
1304 })
1305 case "message-tags", "labeled-response", "away-notify", "batch", "server-time":
1306 // Nothing to do
1307 default:
1308 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
1309 }
1310 return nil
1311}
1312
1313func (uc *upstreamConn) readMessages(ch chan<- event) error {
1314 for {
1315 msg, err := uc.ReadMessage()
1316 if err == io.EOF {
1317 break
1318 } else if err != nil {
1319 return fmt.Errorf("failed to read IRC command: %v", err)
1320 }
1321
1322 ch <- eventUpstreamMessage{msg, uc}
1323 }
1324
1325 return nil
1326}
1327
1328func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1329 if uc.caps["labeled-response"] {
1330 if msg.Tags == nil {
1331 msg.Tags = make(map[string]irc.TagValue)
1332 }
1333 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1334 uc.nextLabelID++
1335 }
1336 uc.SendMessage(msg)
1337}
1338
1339// TODO: handle moving logs when a network name changes, when support for this is added
1340func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
1341 if uc.srv.LogPath == "" {
1342 return
1343 }
1344
1345 ml, ok := uc.messageLoggers[entity]
1346 if !ok {
1347 ml = newMessageLogger(uc.network, entity)
1348 uc.messageLoggers[entity] = ml
1349 }
1350
1351 if err := ml.Append(msg); err != nil {
1352 uc.logger.Printf("failed to log message: %v", err)
1353 }
1354}
1355
1356// appendHistory appends a message to the history. entity can be empty.
1357func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
1358 // If no client is offline, no need to append the message to the buffer
1359 if len(uc.network.offlineClients) == 0 {
1360 return
1361 }
1362
1363 history, ok := uc.network.history[entity]
1364 if !ok {
1365 history = &networkHistory{
1366 offlineClients: make(map[string]uint64),
1367 ring: NewRing(uc.srv.RingCap),
1368 }
1369 uc.network.history[entity] = history
1370
1371 for clientName, _ := range uc.network.offlineClients {
1372 history.offlineClients[clientName] = 0
1373 }
1374 }
1375
1376 history.ring.Produce(msg)
1377}
1378
1379// produce appends a message to the logs, adds it to the history and forwards
1380// it to connected downstream connections.
1381//
1382// If origin is not nil and origin doesn't support echo-message, the message is
1383// forwarded to all connections except origin.
1384func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) {
1385 if target != "" {
1386 uc.appendLog(target, msg)
1387 }
1388
1389 uc.appendHistory(target, msg)
1390
1391 uc.forEachDownstream(func(dc *downstreamConn) {
1392 if dc != origin || dc.caps["echo-message"] {
1393 dc.SendMessage(dc.marshalMessage(msg, uc.network))
1394 }
1395 })
1396}
1397
1398func (uc *upstreamConn) updateAway() {
1399 away := true
1400 uc.forEachDownstream(func(*downstreamConn) {
1401 away = false
1402 })
1403 if away == uc.away {
1404 return
1405 }
1406 if away {
1407 uc.SendMessage(&irc.Message{
1408 Command: "AWAY",
1409 Params: []string{"Auto away"},
1410 })
1411 } else {
1412 uc.SendMessage(&irc.Message{
1413 Command: "AWAY",
1414 })
1415 }
1416 uc.away = away
1417}
Note: See TracBrowser for help on using the repository browser.