source: code/trunk/upstream.go@ 269

Last change on this file since 269 was 269, checked in by delthas, 5 years ago

Add support for IRC address schemes

This is preparatory work for adding other connection types to upstream
servers. The service command network create now accepts a scheme in
the address flag, which specifies how to connect to the upstream server.

The only supported scheme for now is ircs, which is also the default if
no scheme is specified. ircs connects to a network over a TLS TCP
connection.

File size: 33.9 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "gopkg.in/irc.v3"
16)
17
18type upstreamChannel struct {
19 Name string
20 conn *upstreamConn
21 Topic string
22 TopicWho string
23 TopicTime time.Time
24 Status channelStatus
25 modes channelModes
26 creationTime string
27 Members map[string]*membership
28 complete bool
29}
30
31type upstreamConn struct {
32 conn
33
34 network *network
35 user *user
36
37 serverName string
38 availableUserModes string
39 availableChannelModes map[byte]channelModeType
40 availableChannelTypes string
41 availableMemberships []membership
42
43 registered bool
44 nick string
45 username string
46 realname string
47 modes userModes
48 channels map[string]*upstreamChannel
49 caps map[string]string
50 batches map[string]batch
51 away bool
52
53 tagsSupported bool
54 labelsSupported bool
55 nextLabelID uint64
56
57 saslClient sasl.Client
58 saslStarted bool
59
60 // set of LIST commands in progress, per downstream
61 pendingLISTDownstreamSet map[uint64]struct{}
62
63 messageLoggers map[string]*messageLogger
64}
65
66func connectToUpstream(network *network) (*upstreamConn, error) {
67 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
68
69 var scheme string
70 var addr string
71
72 addrParts := strings.SplitN(network.Addr, "://", 2)
73 if len(addrParts) == 2 {
74 scheme = addrParts[0]
75 addr = addrParts[1]
76 } else {
77 scheme = "ircs"
78 addr = addrParts[0]
79 }
80
81 dialer := net.Dialer{Timeout: connectTimeout}
82
83 var netConn net.Conn
84 var err error
85 switch scheme {
86 case "ircs":
87 if !strings.ContainsRune(addr, ':') {
88 addr = addr + ":6697"
89 }
90
91 logger.Printf("connecting to TLS server at address %q", addr)
92 netConn, err = tls.DialWithDialer(&dialer, "tcp", addr, nil)
93 default:
94 return nil, fmt.Errorf("failed to dial %q: unknown scheme: %v", addr, scheme)
95 }
96 if err != nil {
97 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
98 }
99
100 uc := &upstreamConn{
101 conn: *newConn(network.user.srv, netConn, logger),
102 network: network,
103 user: network.user,
104 channels: make(map[string]*upstreamChannel),
105 caps: make(map[string]string),
106 batches: make(map[string]batch),
107 availableChannelTypes: stdChannelTypes,
108 availableChannelModes: stdChannelModes,
109 availableMemberships: stdMemberships,
110 pendingLISTDownstreamSet: make(map[uint64]struct{}),
111 messageLoggers: make(map[string]*messageLogger),
112 }
113
114 return uc, nil
115}
116
117func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
118 uc.network.forEachDownstream(f)
119}
120
121func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
122 uc.forEachDownstream(func(dc *downstreamConn) {
123 if id != 0 && id != dc.id {
124 return
125 }
126 f(dc)
127 })
128}
129
130func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
131 ch, ok := uc.channels[name]
132 if !ok {
133 return nil, fmt.Errorf("unknown channel %q", name)
134 }
135 return ch, nil
136}
137
138func (uc *upstreamConn) isChannel(entity string) bool {
139 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
140 return true
141 }
142 return false
143}
144
145func (uc *upstreamConn) getPendingLIST() *pendingLIST {
146 for _, pl := range uc.user.pendingLISTs {
147 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
148 continue
149 }
150 return &pl
151 }
152 return nil
153}
154
155func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
156 found = false
157 for i := 0; i < len(uc.user.pendingLISTs); i++ {
158 pl := uc.user.pendingLISTs[i]
159 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
160 continue
161 }
162 delete(pl.pendingCommands, uc.network.ID)
163 if len(pl.pendingCommands) == 0 {
164 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
165 i--
166 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
167 dc.SendMessage(&irc.Message{
168 Prefix: dc.srv.prefix(),
169 Command: irc.RPL_LISTEND,
170 Params: []string{dc.nick, "End of /LIST"},
171 })
172 })
173 }
174 found = true
175 if !all {
176 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
177 uc.user.forEachUpstream(func(uc *upstreamConn) {
178 uc.trySendLIST(pl.downstreamID)
179 })
180 return
181 }
182 }
183 return
184}
185
186func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
187 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
188 // a LIST command is already pending
189 // we will try again when that command is completed
190 return
191 }
192
193 for _, pl := range uc.user.pendingLISTs {
194 if pl.downstreamID != downstreamID {
195 continue
196 }
197 // this is the first pending LIST command list of the downstream
198 listCommand, ok := pl.pendingCommands[uc.network.ID]
199 if !ok {
200 // there is no command for this upstream in these LIST commands
201 // do not send anything
202 continue
203 }
204 // there is a command for this upstream in these LIST commands
205 // send it now
206
207 uc.SendMessageLabeled(downstreamID, listCommand)
208
209 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
210 return
211 }
212}
213
214func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
215 for _, m := range uc.availableMemberships {
216 if m.Prefix == s[0] {
217 return &m, s[1:]
218 }
219 }
220 return nil, s
221}
222
223func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
224 var label string
225 if l, ok := msg.GetTag("label"); ok {
226 label = l
227 }
228
229 var msgBatch *batch
230 if batchName, ok := msg.GetTag("batch"); ok {
231 b, ok := uc.batches[batchName]
232 if !ok {
233 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
234 }
235 msgBatch = &b
236 if label == "" {
237 label = msgBatch.Label
238 }
239 }
240
241 var downstreamID uint64 = 0
242 if label != "" {
243 var labelOffset uint64
244 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
245 if err == nil && n < 2 {
246 err = errors.New("not enough arguments")
247 }
248 if err != nil {
249 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
250 }
251 }
252
253 if _, ok := msg.Tags["time"]; !ok {
254 msg.Tags["time"] = irc.TagValue(time.Now().UTC().Format(serverTimeLayout))
255 }
256
257 switch msg.Command {
258 case "PING":
259 uc.SendMessage(&irc.Message{
260 Command: "PONG",
261 Params: msg.Params,
262 })
263 return nil
264 case "NOTICE":
265 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
266 uc.produce("", msg, nil)
267 } else { // regular user NOTICE
268 var entity, text string
269 if err := parseMessageParams(msg, &entity, &text); err != nil {
270 return err
271 }
272
273 target := entity
274 if target == uc.nick {
275 target = msg.Prefix.Name
276 }
277 uc.produce(target, msg, nil)
278 }
279 case "CAP":
280 var subCmd string
281 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
282 return err
283 }
284 subCmd = strings.ToUpper(subCmd)
285 subParams := msg.Params[2:]
286 switch subCmd {
287 case "LS":
288 if len(subParams) < 1 {
289 return newNeedMoreParamsError(msg.Command)
290 }
291 caps := strings.Fields(subParams[len(subParams)-1])
292 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
293
294 for _, s := range caps {
295 kv := strings.SplitN(s, "=", 2)
296 k := strings.ToLower(kv[0])
297 var v string
298 if len(kv) == 2 {
299 v = kv[1]
300 }
301 uc.caps[k] = v
302 }
303
304 if more {
305 break // wait to receive all capabilities
306 }
307
308 requestCaps := make([]string, 0, 16)
309 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time"} {
310 if _, ok := uc.caps[c]; ok {
311 requestCaps = append(requestCaps, c)
312 }
313 }
314
315 if uc.requestSASL() {
316 requestCaps = append(requestCaps, "sasl")
317 }
318
319 if len(requestCaps) > 0 {
320 uc.SendMessage(&irc.Message{
321 Command: "CAP",
322 Params: []string{"REQ", strings.Join(requestCaps, " ")},
323 })
324 }
325
326 if uc.requestSASL() {
327 break // we'll send CAP END after authentication is completed
328 }
329
330 uc.SendMessage(&irc.Message{
331 Command: "CAP",
332 Params: []string{"END"},
333 })
334 case "ACK", "NAK":
335 if len(subParams) < 1 {
336 return newNeedMoreParamsError(msg.Command)
337 }
338 caps := strings.Fields(subParams[0])
339
340 for _, name := range caps {
341 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
342 return err
343 }
344 }
345
346 if uc.saslClient == nil {
347 uc.SendMessage(&irc.Message{
348 Command: "CAP",
349 Params: []string{"END"},
350 })
351 }
352 default:
353 uc.logger.Printf("unhandled message: %v", msg)
354 }
355 case "AUTHENTICATE":
356 if uc.saslClient == nil {
357 return fmt.Errorf("received unexpected AUTHENTICATE message")
358 }
359
360 // TODO: if a challenge is 400 bytes long, buffer it
361 var challengeStr string
362 if err := parseMessageParams(msg, &challengeStr); err != nil {
363 uc.SendMessage(&irc.Message{
364 Command: "AUTHENTICATE",
365 Params: []string{"*"},
366 })
367 return err
368 }
369
370 var challenge []byte
371 if challengeStr != "+" {
372 var err error
373 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
374 if err != nil {
375 uc.SendMessage(&irc.Message{
376 Command: "AUTHENTICATE",
377 Params: []string{"*"},
378 })
379 return err
380 }
381 }
382
383 var resp []byte
384 var err error
385 if !uc.saslStarted {
386 _, resp, err = uc.saslClient.Start()
387 uc.saslStarted = true
388 } else {
389 resp, err = uc.saslClient.Next(challenge)
390 }
391 if err != nil {
392 uc.SendMessage(&irc.Message{
393 Command: "AUTHENTICATE",
394 Params: []string{"*"},
395 })
396 return err
397 }
398
399 // TODO: send response in multiple chunks if >= 400 bytes
400 var respStr = "+"
401 if resp != nil {
402 respStr = base64.StdEncoding.EncodeToString(resp)
403 }
404
405 uc.SendMessage(&irc.Message{
406 Command: "AUTHENTICATE",
407 Params: []string{respStr},
408 })
409 case irc.RPL_LOGGEDIN:
410 var account string
411 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
412 return err
413 }
414 uc.logger.Printf("logged in with account %q", account)
415 case irc.RPL_LOGGEDOUT:
416 uc.logger.Printf("logged out")
417 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
418 var info string
419 if err := parseMessageParams(msg, nil, &info); err != nil {
420 return err
421 }
422 switch msg.Command {
423 case irc.ERR_NICKLOCKED:
424 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
425 case irc.ERR_SASLFAIL:
426 uc.logger.Printf("SASL authentication failed: %v", info)
427 case irc.ERR_SASLTOOLONG:
428 uc.logger.Printf("SASL message too long: %v", info)
429 }
430
431 uc.saslClient = nil
432 uc.saslStarted = false
433
434 uc.SendMessage(&irc.Message{
435 Command: "CAP",
436 Params: []string{"END"},
437 })
438 case irc.RPL_WELCOME:
439 uc.registered = true
440 uc.logger.Printf("connection registered")
441
442 for _, ch := range uc.network.channels {
443 params := []string{ch.Name}
444 if ch.Key != "" {
445 params = append(params, ch.Key)
446 }
447 uc.SendMessage(&irc.Message{
448 Command: "JOIN",
449 Params: params,
450 })
451 }
452 case irc.RPL_MYINFO:
453 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
454 return err
455 }
456 case irc.RPL_ISUPPORT:
457 if err := parseMessageParams(msg, nil, nil); err != nil {
458 return err
459 }
460 for _, token := range msg.Params[1 : len(msg.Params)-1] {
461 negate := false
462 parameter := token
463 value := ""
464 if strings.HasPrefix(token, "-") {
465 negate = true
466 token = token[1:]
467 } else {
468 if i := strings.IndexByte(token, '='); i >= 0 {
469 parameter = token[:i]
470 value = token[i+1:]
471 }
472 }
473 if !negate {
474 switch parameter {
475 case "CHANMODES":
476 parts := strings.SplitN(value, ",", 5)
477 if len(parts) < 4 {
478 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
479 }
480 modes := make(map[byte]channelModeType)
481 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
482 for j := 0; j < len(parts[i]); j++ {
483 mode := parts[i][j]
484 modes[mode] = mt
485 }
486 }
487 uc.availableChannelModes = modes
488 case "CHANTYPES":
489 uc.availableChannelTypes = value
490 case "PREFIX":
491 if value == "" {
492 uc.availableMemberships = nil
493 } else {
494 if value[0] != '(' {
495 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
496 }
497 sep := strings.IndexByte(value, ')')
498 if sep < 0 || len(value) != sep*2 {
499 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
500 }
501 memberships := make([]membership, len(value)/2-1)
502 for i := range memberships {
503 memberships[i] = membership{
504 Mode: value[i+1],
505 Prefix: value[sep+i+1],
506 }
507 }
508 uc.availableMemberships = memberships
509 }
510 }
511 } else {
512 // TODO: handle ISUPPORT negations
513 }
514 }
515 case "BATCH":
516 var tag string
517 if err := parseMessageParams(msg, &tag); err != nil {
518 return err
519 }
520
521 if strings.HasPrefix(tag, "+") {
522 tag = tag[1:]
523 if _, ok := uc.batches[tag]; ok {
524 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
525 }
526 var batchType string
527 if err := parseMessageParams(msg, nil, &batchType); err != nil {
528 return err
529 }
530 label := label
531 if label == "" && msgBatch != nil {
532 label = msgBatch.Label
533 }
534 uc.batches[tag] = batch{
535 Type: batchType,
536 Params: msg.Params[2:],
537 Outer: msgBatch,
538 Label: label,
539 }
540 } else if strings.HasPrefix(tag, "-") {
541 tag = tag[1:]
542 if _, ok := uc.batches[tag]; !ok {
543 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
544 }
545 delete(uc.batches, tag)
546 } else {
547 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
548 }
549 case "NICK":
550 if msg.Prefix == nil {
551 return fmt.Errorf("expected a prefix")
552 }
553
554 var newNick string
555 if err := parseMessageParams(msg, &newNick); err != nil {
556 return err
557 }
558
559 me := false
560 if msg.Prefix.Name == uc.nick {
561 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
562 me = true
563 uc.nick = newNick
564 }
565
566 for _, ch := range uc.channels {
567 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
568 delete(ch.Members, msg.Prefix.Name)
569 ch.Members[newNick] = membership
570 uc.appendLog(ch.Name, msg)
571 uc.appendHistory(ch.Name, msg)
572 }
573 }
574
575 if !me {
576 uc.forEachDownstream(func(dc *downstreamConn) {
577 dc.SendMessage(dc.marshalMessage(msg, uc.network))
578 })
579 }
580 case "JOIN":
581 if msg.Prefix == nil {
582 return fmt.Errorf("expected a prefix")
583 }
584
585 var channels string
586 if err := parseMessageParams(msg, &channels); err != nil {
587 return err
588 }
589
590 for _, ch := range strings.Split(channels, ",") {
591 if msg.Prefix.Name == uc.nick {
592 uc.logger.Printf("joined channel %q", ch)
593 uc.channels[ch] = &upstreamChannel{
594 Name: ch,
595 conn: uc,
596 Members: make(map[string]*membership),
597 }
598
599 uc.SendMessage(&irc.Message{
600 Command: "MODE",
601 Params: []string{ch},
602 })
603 } else {
604 ch, err := uc.getChannel(ch)
605 if err != nil {
606 return err
607 }
608 ch.Members[msg.Prefix.Name] = nil
609 }
610
611 chMsg := msg.Copy()
612 chMsg.Params[0] = ch
613 uc.produce(ch, chMsg, nil)
614 }
615 case "PART":
616 if msg.Prefix == nil {
617 return fmt.Errorf("expected a prefix")
618 }
619
620 var channels string
621 if err := parseMessageParams(msg, &channels); err != nil {
622 return err
623 }
624
625 for _, ch := range strings.Split(channels, ",") {
626 if msg.Prefix.Name == uc.nick {
627 uc.logger.Printf("parted channel %q", ch)
628 delete(uc.channels, ch)
629 } else {
630 ch, err := uc.getChannel(ch)
631 if err != nil {
632 return err
633 }
634 delete(ch.Members, msg.Prefix.Name)
635 }
636
637 chMsg := msg.Copy()
638 chMsg.Params[0] = ch
639 uc.produce(ch, chMsg, nil)
640 }
641 case "KICK":
642 if msg.Prefix == nil {
643 return fmt.Errorf("expected a prefix")
644 }
645
646 var channel, user string
647 if err := parseMessageParams(msg, &channel, &user); err != nil {
648 return err
649 }
650
651 if user == uc.nick {
652 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
653 delete(uc.channels, channel)
654 } else {
655 ch, err := uc.getChannel(channel)
656 if err != nil {
657 return err
658 }
659 delete(ch.Members, user)
660 }
661
662 uc.produce(channel, msg, nil)
663 case "QUIT":
664 if msg.Prefix == nil {
665 return fmt.Errorf("expected a prefix")
666 }
667
668 if msg.Prefix.Name == uc.nick {
669 uc.logger.Printf("quit")
670 }
671
672 for _, ch := range uc.channels {
673 if _, ok := ch.Members[msg.Prefix.Name]; ok {
674 delete(ch.Members, msg.Prefix.Name)
675
676 uc.appendLog(ch.Name, msg)
677 uc.appendHistory(ch.Name, msg)
678 }
679 }
680
681 if msg.Prefix.Name != uc.nick {
682 uc.forEachDownstream(func(dc *downstreamConn) {
683 dc.SendMessage(dc.marshalMessage(msg, uc.network))
684 })
685 }
686 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
687 var name, topic string
688 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
689 return err
690 }
691 ch, err := uc.getChannel(name)
692 if err != nil {
693 return err
694 }
695 if msg.Command == irc.RPL_TOPIC {
696 ch.Topic = topic
697 } else {
698 ch.Topic = ""
699 }
700 case "TOPIC":
701 var name string
702 if err := parseMessageParams(msg, &name); err != nil {
703 return err
704 }
705 ch, err := uc.getChannel(name)
706 if err != nil {
707 return err
708 }
709 if len(msg.Params) > 1 {
710 ch.Topic = msg.Params[1]
711 } else {
712 ch.Topic = ""
713 }
714 uc.produce(ch.Name, msg, nil)
715 case "MODE":
716 var name, modeStr string
717 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
718 return err
719 }
720
721 if !uc.isChannel(name) { // user mode change
722 if name != uc.nick {
723 return fmt.Errorf("received MODE message for unknown nick %q", name)
724 }
725 return uc.modes.Apply(modeStr)
726 // TODO: notify downstreams about user mode change?
727 } else { // channel mode change
728 ch, err := uc.getChannel(name)
729 if err != nil {
730 return err
731 }
732
733 if ch.modes != nil {
734 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
735 return err
736 }
737 }
738
739 uc.produce(ch.Name, msg, nil)
740 }
741 case irc.RPL_UMODEIS:
742 if err := parseMessageParams(msg, nil); err != nil {
743 return err
744 }
745 modeStr := ""
746 if len(msg.Params) > 1 {
747 modeStr = msg.Params[1]
748 }
749
750 uc.modes = ""
751 if err := uc.modes.Apply(modeStr); err != nil {
752 return err
753 }
754 // TODO: send RPL_UMODEIS to downstream connections when applicable
755 case irc.RPL_CHANNELMODEIS:
756 var channel string
757 if err := parseMessageParams(msg, nil, &channel); err != nil {
758 return err
759 }
760 modeStr := ""
761 if len(msg.Params) > 2 {
762 modeStr = msg.Params[2]
763 }
764
765 ch, err := uc.getChannel(channel)
766 if err != nil {
767 return err
768 }
769
770 firstMode := ch.modes == nil
771 ch.modes = make(map[byte]string)
772 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
773 return err
774 }
775 if firstMode {
776 modeStr, modeParams := ch.modes.Format()
777
778 uc.forEachDownstream(func(dc *downstreamConn) {
779 params := []string{dc.nick, dc.marshalEntity(uc.network, channel), modeStr}
780 params = append(params, modeParams...)
781
782 dc.SendMessage(&irc.Message{
783 Prefix: dc.srv.prefix(),
784 Command: irc.RPL_CHANNELMODEIS,
785 Params: params,
786 })
787 })
788 }
789 case rpl_creationtime:
790 var channel, creationTime string
791 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
792 return err
793 }
794
795 ch, err := uc.getChannel(channel)
796 if err != nil {
797 return err
798 }
799
800 firstCreationTime := ch.creationTime == ""
801 ch.creationTime = creationTime
802 if firstCreationTime {
803 uc.forEachDownstream(func(dc *downstreamConn) {
804 dc.SendMessage(&irc.Message{
805 Prefix: dc.srv.prefix(),
806 Command: rpl_creationtime,
807 Params: []string{dc.nick, channel, creationTime},
808 })
809 })
810 }
811 case rpl_topicwhotime:
812 var name, who, timeStr string
813 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
814 return err
815 }
816 ch, err := uc.getChannel(name)
817 if err != nil {
818 return err
819 }
820 ch.TopicWho = who
821 sec, err := strconv.ParseInt(timeStr, 10, 64)
822 if err != nil {
823 return fmt.Errorf("failed to parse topic time: %v", err)
824 }
825 ch.TopicTime = time.Unix(sec, 0)
826 case irc.RPL_LIST:
827 var channel, clients, topic string
828 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
829 return err
830 }
831
832 pl := uc.getPendingLIST()
833 if pl == nil {
834 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
835 }
836
837 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
838 dc.SendMessage(&irc.Message{
839 Prefix: dc.srv.prefix(),
840 Command: irc.RPL_LIST,
841 Params: []string{dc.nick, dc.marshalEntity(uc.network, channel), clients, topic},
842 })
843 })
844 case irc.RPL_LISTEND:
845 ok := uc.endPendingLISTs(false)
846 if !ok {
847 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
848 }
849 case irc.RPL_NAMREPLY:
850 var name, statusStr, members string
851 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
852 return err
853 }
854
855 ch, ok := uc.channels[name]
856 if !ok {
857 // NAMES on a channel we have not joined, forward to downstream
858 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
859 channel := dc.marshalEntity(uc.network, name)
860 members := splitSpace(members)
861 for i, member := range members {
862 membership, nick := uc.parseMembershipPrefix(member)
863 members[i] = membership.String() + dc.marshalEntity(uc.network, nick)
864 }
865 memberStr := strings.Join(members, " ")
866
867 dc.SendMessage(&irc.Message{
868 Prefix: dc.srv.prefix(),
869 Command: irc.RPL_NAMREPLY,
870 Params: []string{dc.nick, statusStr, channel, memberStr},
871 })
872 })
873 return nil
874 }
875
876 status, err := parseChannelStatus(statusStr)
877 if err != nil {
878 return err
879 }
880 ch.Status = status
881
882 for _, s := range splitSpace(members) {
883 membership, nick := uc.parseMembershipPrefix(s)
884 ch.Members[nick] = membership
885 }
886 case irc.RPL_ENDOFNAMES:
887 var name string
888 if err := parseMessageParams(msg, nil, &name); err != nil {
889 return err
890 }
891
892 ch, ok := uc.channels[name]
893 if !ok {
894 // NAMES on a channel we have not joined, forward to downstream
895 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
896 channel := dc.marshalEntity(uc.network, name)
897
898 dc.SendMessage(&irc.Message{
899 Prefix: dc.srv.prefix(),
900 Command: irc.RPL_ENDOFNAMES,
901 Params: []string{dc.nick, channel, "End of /NAMES list"},
902 })
903 })
904 return nil
905 }
906
907 if ch.complete {
908 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
909 }
910 ch.complete = true
911
912 uc.forEachDownstream(func(dc *downstreamConn) {
913 forwardChannel(dc, ch)
914 })
915 case irc.RPL_WHOREPLY:
916 var channel, username, host, server, nick, mode, trailing string
917 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
918 return err
919 }
920
921 parts := strings.SplitN(trailing, " ", 2)
922 if len(parts) != 2 {
923 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
924 }
925 realname := parts[1]
926 hops, err := strconv.Atoi(parts[0])
927 if err != nil {
928 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
929 }
930 hops++
931
932 trailing = strconv.Itoa(hops) + " " + realname
933
934 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
935 channel := channel
936 if channel != "*" {
937 channel = dc.marshalEntity(uc.network, channel)
938 }
939 nick := dc.marshalEntity(uc.network, nick)
940 dc.SendMessage(&irc.Message{
941 Prefix: dc.srv.prefix(),
942 Command: irc.RPL_WHOREPLY,
943 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
944 })
945 })
946 case irc.RPL_ENDOFWHO:
947 var name string
948 if err := parseMessageParams(msg, nil, &name); err != nil {
949 return err
950 }
951
952 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
953 name := name
954 if name != "*" {
955 // TODO: support WHO masks
956 name = dc.marshalEntity(uc.network, name)
957 }
958 dc.SendMessage(&irc.Message{
959 Prefix: dc.srv.prefix(),
960 Command: irc.RPL_ENDOFWHO,
961 Params: []string{dc.nick, name, "End of /WHO list"},
962 })
963 })
964 case irc.RPL_WHOISUSER:
965 var nick, username, host, realname string
966 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
967 return err
968 }
969
970 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
971 nick := dc.marshalEntity(uc.network, nick)
972 dc.SendMessage(&irc.Message{
973 Prefix: dc.srv.prefix(),
974 Command: irc.RPL_WHOISUSER,
975 Params: []string{dc.nick, nick, username, host, "*", realname},
976 })
977 })
978 case irc.RPL_WHOISSERVER:
979 var nick, server, serverInfo string
980 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
981 return err
982 }
983
984 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
985 nick := dc.marshalEntity(uc.network, nick)
986 dc.SendMessage(&irc.Message{
987 Prefix: dc.srv.prefix(),
988 Command: irc.RPL_WHOISSERVER,
989 Params: []string{dc.nick, nick, server, serverInfo},
990 })
991 })
992 case irc.RPL_WHOISOPERATOR:
993 var nick string
994 if err := parseMessageParams(msg, nil, &nick); err != nil {
995 return err
996 }
997
998 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
999 nick := dc.marshalEntity(uc.network, nick)
1000 dc.SendMessage(&irc.Message{
1001 Prefix: dc.srv.prefix(),
1002 Command: irc.RPL_WHOISOPERATOR,
1003 Params: []string{dc.nick, nick, "is an IRC operator"},
1004 })
1005 })
1006 case irc.RPL_WHOISIDLE:
1007 var nick string
1008 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1009 return err
1010 }
1011
1012 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1013 nick := dc.marshalEntity(uc.network, nick)
1014 params := []string{dc.nick, nick}
1015 params = append(params, msg.Params[2:]...)
1016 dc.SendMessage(&irc.Message{
1017 Prefix: dc.srv.prefix(),
1018 Command: irc.RPL_WHOISIDLE,
1019 Params: params,
1020 })
1021 })
1022 case irc.RPL_WHOISCHANNELS:
1023 var nick, channelList string
1024 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1025 return err
1026 }
1027 channels := splitSpace(channelList)
1028
1029 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1030 nick := dc.marshalEntity(uc.network, nick)
1031 channelList := make([]string, len(channels))
1032 for i, channel := range channels {
1033 prefix, channel := uc.parseMembershipPrefix(channel)
1034 channel = dc.marshalEntity(uc.network, channel)
1035 channelList[i] = prefix.String() + channel
1036 }
1037 channels := strings.Join(channelList, " ")
1038 dc.SendMessage(&irc.Message{
1039 Prefix: dc.srv.prefix(),
1040 Command: irc.RPL_WHOISCHANNELS,
1041 Params: []string{dc.nick, nick, channels},
1042 })
1043 })
1044 case irc.RPL_ENDOFWHOIS:
1045 var nick string
1046 if err := parseMessageParams(msg, nil, &nick); err != nil {
1047 return err
1048 }
1049
1050 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1051 nick := dc.marshalEntity(uc.network, nick)
1052 dc.SendMessage(&irc.Message{
1053 Prefix: dc.srv.prefix(),
1054 Command: irc.RPL_ENDOFWHOIS,
1055 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1056 })
1057 })
1058 case "PRIVMSG":
1059 if msg.Prefix == nil {
1060 return fmt.Errorf("expected a prefix")
1061 }
1062
1063 var entity, text string
1064 if err := parseMessageParams(msg, &entity, &text); err != nil {
1065 return err
1066 }
1067
1068 if msg.Prefix.Name == serviceNick {
1069 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1070 break
1071 }
1072 if entity == serviceNick {
1073 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1074 break
1075 }
1076
1077 target := entity
1078 if target == uc.nick {
1079 target = msg.Prefix.Name
1080 }
1081 uc.produce(target, msg, nil)
1082 case "INVITE":
1083 var nick string
1084 var channel string
1085 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1086 return err
1087 }
1088
1089 uc.forEachDownstream(func(dc *downstreamConn) {
1090 dc.SendMessage(&irc.Message{
1091 Prefix: dc.marshalUserPrefix(uc.network, msg.Prefix),
1092 Command: "INVITE",
1093 Params: []string{dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1094 })
1095 })
1096 case irc.RPL_INVITING:
1097 var nick string
1098 var channel string
1099 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1100 return err
1101 }
1102
1103 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1104 dc.SendMessage(&irc.Message{
1105 Prefix: dc.srv.prefix(),
1106 Command: irc.RPL_INVITING,
1107 Params: []string{dc.nick, dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
1108 })
1109 })
1110 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1111 var command, reason string
1112 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1113 return err
1114 }
1115
1116 if command == "LIST" {
1117 ok := uc.endPendingLISTs(false)
1118 if !ok {
1119 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1120 }
1121 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1122 dc.SendMessage(&irc.Message{
1123 Prefix: uc.srv.prefix(),
1124 Command: msg.Command,
1125 Params: []string{dc.nick, "LIST", reason},
1126 })
1127 })
1128 }
1129 case "TAGMSG":
1130 // TODO: relay to downstream connections that accept message-tags
1131 case "ACK":
1132 // Ignore
1133 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1134 // Ignore
1135 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1136 // Ignore
1137 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1138 // Ignore
1139 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1140 // Ignore
1141 case irc.RPL_LISTSTART:
1142 // Ignore
1143 case rpl_localusers, rpl_globalusers:
1144 // Ignore
1145 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1146 // Ignore
1147 default:
1148 uc.logger.Printf("unhandled message: %v", msg)
1149 }
1150 return nil
1151}
1152
1153func splitSpace(s string) []string {
1154 return strings.FieldsFunc(s, func(r rune) bool {
1155 return r == ' '
1156 })
1157}
1158
1159func (uc *upstreamConn) register() {
1160 uc.nick = uc.network.Nick
1161 uc.username = uc.network.Username
1162 if uc.username == "" {
1163 uc.username = uc.nick
1164 }
1165 uc.realname = uc.network.Realname
1166 if uc.realname == "" {
1167 uc.realname = uc.nick
1168 }
1169
1170 uc.SendMessage(&irc.Message{
1171 Command: "CAP",
1172 Params: []string{"LS", "302"},
1173 })
1174
1175 if uc.network.Pass != "" {
1176 uc.SendMessage(&irc.Message{
1177 Command: "PASS",
1178 Params: []string{uc.network.Pass},
1179 })
1180 }
1181
1182 uc.SendMessage(&irc.Message{
1183 Command: "NICK",
1184 Params: []string{uc.nick},
1185 })
1186 uc.SendMessage(&irc.Message{
1187 Command: "USER",
1188 Params: []string{uc.username, "0", "*", uc.realname},
1189 })
1190}
1191
1192func (uc *upstreamConn) runUntilRegistered() error {
1193 for !uc.registered {
1194 msg, err := uc.ReadMessage()
1195 if err != nil {
1196 return fmt.Errorf("failed to read message: %v", err)
1197 }
1198
1199 if err := uc.handleMessage(msg); err != nil {
1200 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1201 }
1202 }
1203
1204 for _, command := range uc.network.ConnectCommands {
1205 m, err := irc.ParseMessage(command)
1206 if err != nil {
1207 uc.logger.Printf("failed to parse connect command %q: %v", command, err)
1208 } else {
1209 uc.SendMessage(m)
1210 }
1211 }
1212
1213 return nil
1214}
1215
1216func (uc *upstreamConn) requestSASL() bool {
1217 if uc.network.SASL.Mechanism == "" {
1218 return false
1219 }
1220
1221 v, ok := uc.caps["sasl"]
1222 if !ok {
1223 return false
1224 }
1225 if v != "" {
1226 mechanisms := strings.Split(v, ",")
1227 found := false
1228 for _, mech := range mechanisms {
1229 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1230 found = true
1231 break
1232 }
1233 }
1234 if !found {
1235 return false
1236 }
1237 }
1238
1239 return true
1240}
1241
1242func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1243 auth := &uc.network.SASL
1244 switch name {
1245 case "sasl":
1246 if !ok {
1247 uc.logger.Printf("server refused to acknowledge the SASL capability")
1248 return nil
1249 }
1250
1251 switch auth.Mechanism {
1252 case "PLAIN":
1253 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1254 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1255 default:
1256 return fmt.Errorf("unsupported SASL mechanism %q", name)
1257 }
1258
1259 uc.SendMessage(&irc.Message{
1260 Command: "AUTHENTICATE",
1261 Params: []string{auth.Mechanism},
1262 })
1263 case "message-tags":
1264 uc.tagsSupported = ok
1265 case "labeled-response":
1266 uc.labelsSupported = ok
1267 case "batch", "server-time":
1268 // Nothing to do
1269 default:
1270 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
1271 }
1272 return nil
1273}
1274
1275func (uc *upstreamConn) readMessages(ch chan<- event) error {
1276 for {
1277 msg, err := uc.ReadMessage()
1278 if err == io.EOF {
1279 break
1280 } else if err != nil {
1281 return fmt.Errorf("failed to read IRC command: %v", err)
1282 }
1283
1284 ch <- eventUpstreamMessage{msg, uc}
1285 }
1286
1287 return nil
1288}
1289
1290func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1291 if uc.labelsSupported {
1292 if msg.Tags == nil {
1293 msg.Tags = make(map[string]irc.TagValue)
1294 }
1295 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1296 uc.nextLabelID++
1297 }
1298 uc.SendMessage(msg)
1299}
1300
1301// TODO: handle moving logs when a network name changes, when support for this is added
1302func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
1303 if uc.srv.LogPath == "" {
1304 return
1305 }
1306
1307 ml, ok := uc.messageLoggers[entity]
1308 if !ok {
1309 ml = newMessageLogger(uc.network, entity)
1310 uc.messageLoggers[entity] = ml
1311 }
1312
1313 if err := ml.Append(msg); err != nil {
1314 uc.logger.Printf("failed to log message: %v", err)
1315 }
1316}
1317
1318// appendHistory appends a message to the history. entity can be empty.
1319func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
1320 // If no client is offline, no need to append the message to the buffer
1321 if len(uc.network.offlineClients) == 0 {
1322 return
1323 }
1324
1325 history, ok := uc.network.history[entity]
1326 if !ok {
1327 history = &networkHistory{
1328 offlineClients: make(map[string]uint64),
1329 ring: NewRing(uc.srv.RingCap),
1330 }
1331 uc.network.history[entity] = history
1332
1333 for clientName, _ := range uc.network.offlineClients {
1334 history.offlineClients[clientName] = 0
1335 }
1336 }
1337
1338 history.ring.Produce(msg)
1339}
1340
1341// produce appends a message to the logs, adds it to the history and forwards
1342// it to connected downstream connections.
1343//
1344// If origin is not nil and origin doesn't support echo-message, the message is
1345// forwarded to all connections except origin.
1346func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstreamConn) {
1347 if target != "" {
1348 uc.appendLog(target, msg)
1349 }
1350
1351 uc.appendHistory(target, msg)
1352
1353 uc.forEachDownstream(func(dc *downstreamConn) {
1354 if dc != origin || dc.caps["echo-message"] {
1355 dc.SendMessage(dc.marshalMessage(msg, uc.network))
1356 }
1357 })
1358}
1359
1360func (uc *upstreamConn) updateAway() {
1361 away := true
1362 uc.forEachDownstream(func(*downstreamConn) {
1363 away = false
1364 })
1365 if away == uc.away {
1366 return
1367 }
1368 if away {
1369 uc.SendMessage(&irc.Message{
1370 Command: "AWAY",
1371 Params: []string{"Auto away"},
1372 })
1373 } else {
1374 uc.SendMessage(&irc.Message{
1375 Command: "AWAY",
1376 })
1377 }
1378 uc.away = away
1379}
Note: See TracBrowser for help on using the repository browser.