source: code/trunk/upstream.go@ 190

Last change on this file since 190 was 187, checked in by contact, 5 years ago

Rename AppendLog to appendLog

This function is only safe to call from inside the user goroutine. Let's
make it private.

File size: 36.1 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "os"
11 "path/filepath"
12 "strconv"
13 "strings"
14 "time"
15
16 "github.com/emersion/go-sasl"
17 "gopkg.in/irc.v3"
18)
19
20type upstreamChannel struct {
21 Name string
22 conn *upstreamConn
23 Topic string
24 TopicWho string
25 TopicTime time.Time
26 Status channelStatus
27 modes channelModes
28 creationTime string
29 Members map[string]*membership
30 complete bool
31}
32
33type upstreamConn struct {
34 network *network
35 logger Logger
36 net net.Conn
37 irc *irc.Conn
38 srv *Server
39 user *user
40 outgoing chan<- *irc.Message
41 closed chan struct{}
42
43 serverName string
44 availableUserModes string
45 availableChannelModes map[byte]channelModeType
46 availableChannelTypes string
47 availableMemberships []membership
48
49 registered bool
50 nick string
51 username string
52 realname string
53 modes userModes
54 channels map[string]*upstreamChannel
55 caps map[string]string
56 batches map[string]batch
57
58 tagsSupported bool
59 labelsSupported bool
60 nextLabelID uint64
61
62 saslClient sasl.Client
63 saslStarted bool
64
65 // set of LIST commands in progress, per downstream
66 pendingLISTDownstreamSet map[uint64]struct{}
67
68 logs map[string]entityLog
69}
70
71type entityLog struct {
72 name string
73 file *os.File
74}
75
76func connectToUpstream(network *network) (*upstreamConn, error) {
77 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
78
79 addr := network.Addr
80 if !strings.ContainsRune(addr, ':') {
81 addr = addr + ":6697"
82 }
83
84 logger.Printf("connecting to TLS server at address %q", addr)
85 netConn, err := tls.Dial("tcp", addr, nil)
86 if err != nil {
87 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
88 }
89
90 setKeepAlive(netConn)
91
92 outgoing := make(chan *irc.Message, 64)
93 uc := &upstreamConn{
94 network: network,
95 logger: logger,
96 net: netConn,
97 irc: irc.NewConn(netConn),
98 srv: network.user.srv,
99 user: network.user,
100 outgoing: outgoing,
101 closed: make(chan struct{}),
102 channels: make(map[string]*upstreamChannel),
103 caps: make(map[string]string),
104 batches: make(map[string]batch),
105 availableChannelTypes: stdChannelTypes,
106 availableChannelModes: stdChannelModes,
107 availableMemberships: stdMemberships,
108 pendingLISTDownstreamSet: make(map[uint64]struct{}),
109 logs: make(map[string]entityLog),
110 }
111
112 go func() {
113 for {
114 var closed bool
115 select {
116 case msg := <-outgoing:
117 if uc.srv.Debug {
118 uc.logger.Printf("sent: %v", msg)
119 }
120 if err := uc.irc.WriteMessage(msg); err != nil {
121 uc.logger.Printf("failed to write message: %v", err)
122 }
123 case <-uc.closed:
124 closed = true
125 }
126 if closed {
127 break
128 }
129 }
130 if err := uc.net.Close(); err != nil {
131 uc.logger.Printf("failed to close connection: %v", err)
132 } else {
133 uc.logger.Printf("connection closed")
134 }
135 }()
136
137 return uc, nil
138}
139
140func (uc *upstreamConn) isClosed() bool {
141 select {
142 case <-uc.closed:
143 return true
144 default:
145 return false
146 }
147}
148
149// Close closes the connection. It is safe to call from any goroutine.
150func (uc *upstreamConn) Close() error {
151 if uc.isClosed() {
152 return fmt.Errorf("upstream connection already closed")
153 }
154 close(uc.closed)
155 return nil
156}
157
158func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
159 uc.user.forEachDownstream(func(dc *downstreamConn) {
160 if dc.network != nil && dc.network != uc.network {
161 return
162 }
163 f(dc)
164 })
165}
166
167func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
168 uc.forEachDownstream(func(dc *downstreamConn) {
169 if id != 0 && id != dc.id {
170 return
171 }
172 f(dc)
173 })
174}
175
176func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
177 ch, ok := uc.channels[name]
178 if !ok {
179 return nil, fmt.Errorf("unknown channel %q", name)
180 }
181 return ch, nil
182}
183
184func (uc *upstreamConn) isChannel(entity string) bool {
185 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
186 return true
187 }
188 return false
189}
190
191func (uc *upstreamConn) getPendingLIST() *pendingLIST {
192 for _, pl := range uc.user.pendingLISTs {
193 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
194 continue
195 }
196 return &pl
197 }
198 return nil
199}
200
201func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
202 found = false
203 for i := 0; i < len(uc.user.pendingLISTs); i++ {
204 pl := uc.user.pendingLISTs[i]
205 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
206 continue
207 }
208 delete(pl.pendingCommands, uc.network.ID)
209 if len(pl.pendingCommands) == 0 {
210 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
211 i--
212 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
213 dc.SendMessage(&irc.Message{
214 Prefix: dc.srv.prefix(),
215 Command: irc.RPL_LISTEND,
216 Params: []string{dc.nick, "End of /LIST"},
217 })
218 })
219 }
220 found = true
221 if !all {
222 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
223 uc.user.forEachUpstream(func(uc *upstreamConn) {
224 uc.trySendLIST(pl.downstreamID)
225 })
226 return
227 }
228 }
229 return
230}
231
232func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
233 // must be called with a lock in uc.user.pendingLISTsLock
234
235 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
236 // a LIST command is already pending
237 // we will try again when that command is completed
238 return
239 }
240
241 for _, pl := range uc.user.pendingLISTs {
242 if pl.downstreamID != downstreamID {
243 continue
244 }
245 // this is the first pending LIST command list of the downstream
246 listCommand, ok := pl.pendingCommands[uc.network.ID]
247 if !ok {
248 // there is no command for this upstream in these LIST commands
249 // do not send anything
250 continue
251 }
252 // there is a command for this upstream in these LIST commands
253 // send it now
254
255 uc.SendMessageLabeled(downstreamID, listCommand)
256
257 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
258 return
259 }
260}
261
262func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
263 for _, m := range uc.availableMemberships {
264 if m.Prefix == s[0] {
265 return &m, s[1:]
266 }
267 }
268 return nil, s
269}
270
271func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
272 var label string
273 if l, ok := msg.GetTag("label"); ok {
274 label = l
275 }
276
277 var msgBatch *batch
278 if batchName, ok := msg.GetTag("batch"); ok {
279 b, ok := uc.batches[batchName]
280 if !ok {
281 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
282 }
283 msgBatch = &b
284 if label == "" {
285 label = msgBatch.Label
286 }
287 }
288
289 var downstreamID uint64 = 0
290 if label != "" {
291 var labelOffset uint64
292 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
293 if err == nil && n < 2 {
294 err = errors.New("not enough arguments")
295 }
296 if err != nil {
297 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
298 }
299 }
300
301 switch msg.Command {
302 case "PING":
303 uc.SendMessage(&irc.Message{
304 Command: "PONG",
305 Params: msg.Params,
306 })
307 return nil
308 case "NOTICE":
309 uc.logger.Print(msg)
310
311 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
312 uc.forEachDownstream(func(dc *downstreamConn) {
313 dc.SendMessage(msg)
314 })
315 } else { // regular user NOTICE
316 var nick, text string
317 if err := parseMessageParams(msg, &nick, &text); err != nil {
318 return err
319 }
320
321 target := nick
322 if nick == uc.nick {
323 target = msg.Prefix.Name
324 }
325 uc.appendLog(target, "<%s> %s", msg.Prefix.Name, text)
326
327 uc.forEachDownstream(func(dc *downstreamConn) {
328 dc.SendMessage(&irc.Message{
329 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
330 Command: "NOTICE",
331 Params: []string{dc.marshalEntity(uc, nick), text},
332 })
333 })
334 }
335 case "CAP":
336 var subCmd string
337 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
338 return err
339 }
340 subCmd = strings.ToUpper(subCmd)
341 subParams := msg.Params[2:]
342 switch subCmd {
343 case "LS":
344 if len(subParams) < 1 {
345 return newNeedMoreParamsError(msg.Command)
346 }
347 caps := strings.Fields(subParams[len(subParams)-1])
348 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
349
350 for _, s := range caps {
351 kv := strings.SplitN(s, "=", 2)
352 k := strings.ToLower(kv[0])
353 var v string
354 if len(kv) == 2 {
355 v = kv[1]
356 }
357 uc.caps[k] = v
358 }
359
360 if more {
361 break // wait to receive all capabilities
362 }
363
364 requestCaps := make([]string, 0, 16)
365 for _, c := range []string{"message-tags", "batch", "labeled-response"} {
366 if _, ok := uc.caps[c]; ok {
367 requestCaps = append(requestCaps, c)
368 }
369 }
370
371 if uc.requestSASL() {
372 requestCaps = append(requestCaps, "sasl")
373 }
374
375 if len(requestCaps) > 0 {
376 uc.SendMessage(&irc.Message{
377 Command: "CAP",
378 Params: []string{"REQ", strings.Join(requestCaps, " ")},
379 })
380 }
381
382 if uc.requestSASL() {
383 break // we'll send CAP END after authentication is completed
384 }
385
386 uc.SendMessage(&irc.Message{
387 Command: "CAP",
388 Params: []string{"END"},
389 })
390 case "ACK", "NAK":
391 if len(subParams) < 1 {
392 return newNeedMoreParamsError(msg.Command)
393 }
394 caps := strings.Fields(subParams[0])
395
396 for _, name := range caps {
397 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
398 return err
399 }
400 }
401
402 if uc.saslClient == nil {
403 uc.SendMessage(&irc.Message{
404 Command: "CAP",
405 Params: []string{"END"},
406 })
407 }
408 default:
409 uc.logger.Printf("unhandled message: %v", msg)
410 }
411 case "AUTHENTICATE":
412 if uc.saslClient == nil {
413 return fmt.Errorf("received unexpected AUTHENTICATE message")
414 }
415
416 // TODO: if a challenge is 400 bytes long, buffer it
417 var challengeStr string
418 if err := parseMessageParams(msg, &challengeStr); err != nil {
419 uc.SendMessage(&irc.Message{
420 Command: "AUTHENTICATE",
421 Params: []string{"*"},
422 })
423 return err
424 }
425
426 var challenge []byte
427 if challengeStr != "+" {
428 var err error
429 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
430 if err != nil {
431 uc.SendMessage(&irc.Message{
432 Command: "AUTHENTICATE",
433 Params: []string{"*"},
434 })
435 return err
436 }
437 }
438
439 var resp []byte
440 var err error
441 if !uc.saslStarted {
442 _, resp, err = uc.saslClient.Start()
443 uc.saslStarted = true
444 } else {
445 resp, err = uc.saslClient.Next(challenge)
446 }
447 if err != nil {
448 uc.SendMessage(&irc.Message{
449 Command: "AUTHENTICATE",
450 Params: []string{"*"},
451 })
452 return err
453 }
454
455 // TODO: send response in multiple chunks if >= 400 bytes
456 var respStr = "+"
457 if resp != nil {
458 respStr = base64.StdEncoding.EncodeToString(resp)
459 }
460
461 uc.SendMessage(&irc.Message{
462 Command: "AUTHENTICATE",
463 Params: []string{respStr},
464 })
465 case irc.RPL_LOGGEDIN:
466 var account string
467 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
468 return err
469 }
470 uc.logger.Printf("logged in with account %q", account)
471 case irc.RPL_LOGGEDOUT:
472 uc.logger.Printf("logged out")
473 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
474 var info string
475 if err := parseMessageParams(msg, nil, &info); err != nil {
476 return err
477 }
478 switch msg.Command {
479 case irc.ERR_NICKLOCKED:
480 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
481 case irc.ERR_SASLFAIL:
482 uc.logger.Printf("SASL authentication failed: %v", info)
483 case irc.ERR_SASLTOOLONG:
484 uc.logger.Printf("SASL message too long: %v", info)
485 }
486
487 uc.saslClient = nil
488 uc.saslStarted = false
489
490 uc.SendMessage(&irc.Message{
491 Command: "CAP",
492 Params: []string{"END"},
493 })
494 case irc.RPL_WELCOME:
495 uc.registered = true
496 uc.logger.Printf("connection registered")
497
498 channels, err := uc.srv.db.ListChannels(uc.network.ID)
499 if err != nil {
500 uc.logger.Printf("failed to list channels from database: %v", err)
501 break
502 }
503
504 for _, ch := range channels {
505 params := []string{ch.Name}
506 if ch.Key != "" {
507 params = append(params, ch.Key)
508 }
509 uc.SendMessage(&irc.Message{
510 Command: "JOIN",
511 Params: params,
512 })
513 }
514 case irc.RPL_MYINFO:
515 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
516 return err
517 }
518 case irc.RPL_ISUPPORT:
519 if err := parseMessageParams(msg, nil, nil); err != nil {
520 return err
521 }
522 for _, token := range msg.Params[1 : len(msg.Params)-1] {
523 negate := false
524 parameter := token
525 value := ""
526 if strings.HasPrefix(token, "-") {
527 negate = true
528 token = token[1:]
529 } else {
530 if i := strings.IndexByte(token, '='); i >= 0 {
531 parameter = token[:i]
532 value = token[i+1:]
533 }
534 }
535 if !negate {
536 switch parameter {
537 case "CHANMODES":
538 parts := strings.SplitN(value, ",", 5)
539 if len(parts) < 4 {
540 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
541 }
542 modes := make(map[byte]channelModeType)
543 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
544 for j := 0; j < len(parts[i]); j++ {
545 mode := parts[i][j]
546 modes[mode] = mt
547 }
548 }
549 uc.availableChannelModes = modes
550 case "CHANTYPES":
551 uc.availableChannelTypes = value
552 case "PREFIX":
553 if value == "" {
554 uc.availableMemberships = nil
555 } else {
556 if value[0] != '(' {
557 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
558 }
559 sep := strings.IndexByte(value, ')')
560 if sep < 0 || len(value) != sep*2 {
561 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
562 }
563 memberships := make([]membership, len(value)/2-1)
564 for i := range memberships {
565 memberships[i] = membership{
566 Mode: value[i+1],
567 Prefix: value[sep+i+1],
568 }
569 }
570 uc.availableMemberships = memberships
571 }
572 }
573 } else {
574 // TODO: handle ISUPPORT negations
575 }
576 }
577 case "BATCH":
578 var tag string
579 if err := parseMessageParams(msg, &tag); err != nil {
580 return err
581 }
582
583 if strings.HasPrefix(tag, "+") {
584 tag = tag[1:]
585 if _, ok := uc.batches[tag]; ok {
586 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
587 }
588 var batchType string
589 if err := parseMessageParams(msg, nil, &batchType); err != nil {
590 return err
591 }
592 label := label
593 if label == "" && msgBatch != nil {
594 label = msgBatch.Label
595 }
596 uc.batches[tag] = batch{
597 Type: batchType,
598 Params: msg.Params[2:],
599 Outer: msgBatch,
600 Label: label,
601 }
602 } else if strings.HasPrefix(tag, "-") {
603 tag = tag[1:]
604 if _, ok := uc.batches[tag]; !ok {
605 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
606 }
607 delete(uc.batches, tag)
608 } else {
609 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
610 }
611 case "NICK":
612 if msg.Prefix == nil {
613 return fmt.Errorf("expected a prefix")
614 }
615
616 var newNick string
617 if err := parseMessageParams(msg, &newNick); err != nil {
618 return err
619 }
620
621 if msg.Prefix.Name == uc.nick {
622 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
623 uc.nick = newNick
624 }
625
626 for _, ch := range uc.channels {
627 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
628 delete(ch.Members, msg.Prefix.Name)
629 ch.Members[newNick] = membership
630 uc.appendLog(ch.Name, "*** %s is now known as %s", msg.Prefix.Name, newNick)
631 }
632 }
633
634 if msg.Prefix.Name != uc.nick {
635 uc.forEachDownstream(func(dc *downstreamConn) {
636 dc.SendMessage(&irc.Message{
637 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
638 Command: "NICK",
639 Params: []string{newNick},
640 })
641 })
642 }
643 case "JOIN":
644 if msg.Prefix == nil {
645 return fmt.Errorf("expected a prefix")
646 }
647
648 var channels string
649 if err := parseMessageParams(msg, &channels); err != nil {
650 return err
651 }
652
653 for _, ch := range strings.Split(channels, ",") {
654 if msg.Prefix.Name == uc.nick {
655 uc.logger.Printf("joined channel %q", ch)
656 uc.channels[ch] = &upstreamChannel{
657 Name: ch,
658 conn: uc,
659 Members: make(map[string]*membership),
660 }
661
662 uc.SendMessage(&irc.Message{
663 Command: "MODE",
664 Params: []string{ch},
665 })
666 } else {
667 ch, err := uc.getChannel(ch)
668 if err != nil {
669 return err
670 }
671 ch.Members[msg.Prefix.Name] = nil
672 }
673
674 uc.appendLog(ch, "*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
675
676 uc.forEachDownstream(func(dc *downstreamConn) {
677 dc.SendMessage(&irc.Message{
678 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
679 Command: "JOIN",
680 Params: []string{dc.marshalChannel(uc, ch)},
681 })
682 })
683 }
684 case "PART":
685 if msg.Prefix == nil {
686 return fmt.Errorf("expected a prefix")
687 }
688
689 var channels string
690 if err := parseMessageParams(msg, &channels); err != nil {
691 return err
692 }
693
694 var reason string
695 if len(msg.Params) > 1 {
696 reason = msg.Params[1]
697 }
698
699 for _, ch := range strings.Split(channels, ",") {
700 if msg.Prefix.Name == uc.nick {
701 uc.logger.Printf("parted channel %q", ch)
702 delete(uc.channels, ch)
703 } else {
704 ch, err := uc.getChannel(ch)
705 if err != nil {
706 return err
707 }
708 delete(ch.Members, msg.Prefix.Name)
709 }
710
711 uc.appendLog(ch, "*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
712
713 uc.forEachDownstream(func(dc *downstreamConn) {
714 dc.SendMessage(&irc.Message{
715 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
716 Command: "PART",
717 Params: []string{dc.marshalChannel(uc, ch)},
718 })
719 })
720 }
721 case "KICK":
722 if msg.Prefix == nil {
723 return fmt.Errorf("expected a prefix")
724 }
725
726 var channel, user string
727 if err := parseMessageParams(msg, &channel, &user); err != nil {
728 return err
729 }
730
731 var reason string
732 if len(msg.Params) > 2 {
733 reason = msg.Params[1]
734 }
735
736 if user == uc.nick {
737 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
738 delete(uc.channels, channel)
739 } else {
740 ch, err := uc.getChannel(channel)
741 if err != nil {
742 return err
743 }
744 delete(ch.Members, user)
745 }
746
747 uc.appendLog(channel, "*** %s was kicked by %s (%s)", user, msg.Prefix.Name, reason)
748
749 uc.forEachDownstream(func(dc *downstreamConn) {
750 params := []string{dc.marshalChannel(uc, channel), dc.marshalNick(uc, user)}
751 if reason != "" {
752 params = append(params, reason)
753 }
754 dc.SendMessage(&irc.Message{
755 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
756 Command: "KICK",
757 Params: params,
758 })
759 })
760 case "QUIT":
761 if msg.Prefix == nil {
762 return fmt.Errorf("expected a prefix")
763 }
764
765 var reason string
766 if len(msg.Params) > 0 {
767 reason = msg.Params[0]
768 }
769
770 if msg.Prefix.Name == uc.nick {
771 uc.logger.Printf("quit")
772 }
773
774 for _, ch := range uc.channels {
775 if _, ok := ch.Members[msg.Prefix.Name]; ok {
776 delete(ch.Members, msg.Prefix.Name)
777
778 uc.appendLog(ch.Name, "*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
779 }
780 }
781
782 if msg.Prefix.Name != uc.nick {
783 uc.forEachDownstream(func(dc *downstreamConn) {
784 dc.SendMessage(&irc.Message{
785 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
786 Command: "QUIT",
787 Params: msg.Params,
788 })
789 })
790 }
791 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
792 var name, topic string
793 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
794 return err
795 }
796 ch, err := uc.getChannel(name)
797 if err != nil {
798 return err
799 }
800 if msg.Command == irc.RPL_TOPIC {
801 ch.Topic = topic
802 } else {
803 ch.Topic = ""
804 }
805 case "TOPIC":
806 var name string
807 if err := parseMessageParams(msg, &name); err != nil {
808 return err
809 }
810 ch, err := uc.getChannel(name)
811 if err != nil {
812 return err
813 }
814 if len(msg.Params) > 1 {
815 ch.Topic = msg.Params[1]
816 } else {
817 ch.Topic = ""
818 }
819 uc.forEachDownstream(func(dc *downstreamConn) {
820 params := []string{dc.marshalChannel(uc, name)}
821 if ch.Topic != "" {
822 params = append(params, ch.Topic)
823 }
824 dc.SendMessage(&irc.Message{
825 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
826 Command: "TOPIC",
827 Params: params,
828 })
829 })
830 case "MODE":
831 var name, modeStr string
832 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
833 return err
834 }
835
836 if !uc.isChannel(name) { // user mode change
837 if name != uc.nick {
838 return fmt.Errorf("received MODE message for unknown nick %q", name)
839 }
840 return uc.modes.Apply(modeStr)
841 // TODO: notify downstreams about user mode change?
842 } else { // channel mode change
843 ch, err := uc.getChannel(name)
844 if err != nil {
845 return err
846 }
847
848 if ch.modes != nil {
849 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
850 return err
851 }
852 }
853
854 modeMsg := modeStr
855 for _, v := range msg.Params[2:] {
856 modeMsg += " " + v
857 }
858 uc.appendLog(ch.Name, "*** %s sets mode: %s", msg.Prefix.Name, modeMsg)
859
860 uc.forEachDownstream(func(dc *downstreamConn) {
861 params := []string{dc.marshalChannel(uc, name), modeStr}
862 params = append(params, msg.Params[2:]...)
863
864 dc.SendMessage(&irc.Message{
865 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
866 Command: "MODE",
867 Params: params,
868 })
869 })
870 }
871 case irc.RPL_UMODEIS:
872 if err := parseMessageParams(msg, nil); err != nil {
873 return err
874 }
875 modeStr := ""
876 if len(msg.Params) > 1 {
877 modeStr = msg.Params[1]
878 }
879
880 uc.modes = ""
881 if err := uc.modes.Apply(modeStr); err != nil {
882 return err
883 }
884 // TODO: send RPL_UMODEIS to downstream connections when applicable
885 case irc.RPL_CHANNELMODEIS:
886 var channel string
887 if err := parseMessageParams(msg, nil, &channel); err != nil {
888 return err
889 }
890 modeStr := ""
891 if len(msg.Params) > 2 {
892 modeStr = msg.Params[2]
893 }
894
895 ch, err := uc.getChannel(channel)
896 if err != nil {
897 return err
898 }
899
900 firstMode := ch.modes == nil
901 ch.modes = make(map[byte]string)
902 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
903 return err
904 }
905 if firstMode {
906 modeStr, modeParams := ch.modes.Format()
907
908 uc.forEachDownstream(func(dc *downstreamConn) {
909 params := []string{dc.nick, dc.marshalChannel(uc, channel), modeStr}
910 params = append(params, modeParams...)
911
912 dc.SendMessage(&irc.Message{
913 Prefix: dc.srv.prefix(),
914 Command: irc.RPL_CHANNELMODEIS,
915 Params: params,
916 })
917 })
918 }
919 case rpl_creationtime:
920 var channel, creationTime string
921 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
922 return err
923 }
924
925 ch, err := uc.getChannel(channel)
926 if err != nil {
927 return err
928 }
929
930 firstCreationTime := ch.creationTime == ""
931 ch.creationTime = creationTime
932 if firstCreationTime {
933 uc.forEachDownstream(func(dc *downstreamConn) {
934 dc.SendMessage(&irc.Message{
935 Prefix: dc.srv.prefix(),
936 Command: rpl_creationtime,
937 Params: []string{dc.nick, channel, creationTime},
938 })
939 })
940 }
941 case rpl_topicwhotime:
942 var name, who, timeStr string
943 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
944 return err
945 }
946 ch, err := uc.getChannel(name)
947 if err != nil {
948 return err
949 }
950 ch.TopicWho = who
951 sec, err := strconv.ParseInt(timeStr, 10, 64)
952 if err != nil {
953 return fmt.Errorf("failed to parse topic time: %v", err)
954 }
955 ch.TopicTime = time.Unix(sec, 0)
956 case irc.RPL_LIST:
957 var channel, clients, topic string
958 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
959 return err
960 }
961
962 pl := uc.getPendingLIST()
963 if pl == nil {
964 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
965 }
966
967 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
968 dc.SendMessage(&irc.Message{
969 Prefix: dc.srv.prefix(),
970 Command: irc.RPL_LIST,
971 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
972 })
973 })
974 case irc.RPL_LISTEND:
975 ok := uc.endPendingLISTs(false)
976 if !ok {
977 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
978 }
979 case irc.RPL_NAMREPLY:
980 var name, statusStr, members string
981 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
982 return err
983 }
984
985 ch, ok := uc.channels[name]
986 if !ok {
987 // NAMES on a channel we have not joined, forward to downstream
988 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
989 channel := dc.marshalChannel(uc, name)
990 members := splitSpace(members)
991 for i, member := range members {
992 membership, nick := uc.parseMembershipPrefix(member)
993 members[i] = membership.String() + dc.marshalNick(uc, nick)
994 }
995 memberStr := strings.Join(members, " ")
996
997 dc.SendMessage(&irc.Message{
998 Prefix: dc.srv.prefix(),
999 Command: irc.RPL_NAMREPLY,
1000 Params: []string{dc.nick, statusStr, channel, memberStr},
1001 })
1002 })
1003 return nil
1004 }
1005
1006 status, err := parseChannelStatus(statusStr)
1007 if err != nil {
1008 return err
1009 }
1010 ch.Status = status
1011
1012 for _, s := range splitSpace(members) {
1013 membership, nick := uc.parseMembershipPrefix(s)
1014 ch.Members[nick] = membership
1015 }
1016 case irc.RPL_ENDOFNAMES:
1017 var name string
1018 if err := parseMessageParams(msg, nil, &name); err != nil {
1019 return err
1020 }
1021
1022 ch, ok := uc.channels[name]
1023 if !ok {
1024 // NAMES on a channel we have not joined, forward to downstream
1025 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1026 channel := dc.marshalChannel(uc, name)
1027
1028 dc.SendMessage(&irc.Message{
1029 Prefix: dc.srv.prefix(),
1030 Command: irc.RPL_ENDOFNAMES,
1031 Params: []string{dc.nick, channel, "End of /NAMES list"},
1032 })
1033 })
1034 return nil
1035 }
1036
1037 if ch.complete {
1038 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
1039 }
1040 ch.complete = true
1041
1042 uc.forEachDownstream(func(dc *downstreamConn) {
1043 forwardChannel(dc, ch)
1044 })
1045 case irc.RPL_WHOREPLY:
1046 var channel, username, host, server, nick, mode, trailing string
1047 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
1048 return err
1049 }
1050
1051 parts := strings.SplitN(trailing, " ", 2)
1052 if len(parts) != 2 {
1053 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
1054 }
1055 realname := parts[1]
1056 hops, err := strconv.Atoi(parts[0])
1057 if err != nil {
1058 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
1059 }
1060 hops++
1061
1062 trailing = strconv.Itoa(hops) + " " + realname
1063
1064 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1065 channel := channel
1066 if channel != "*" {
1067 channel = dc.marshalChannel(uc, channel)
1068 }
1069 nick := dc.marshalNick(uc, nick)
1070 dc.SendMessage(&irc.Message{
1071 Prefix: dc.srv.prefix(),
1072 Command: irc.RPL_WHOREPLY,
1073 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
1074 })
1075 })
1076 case irc.RPL_ENDOFWHO:
1077 var name string
1078 if err := parseMessageParams(msg, nil, &name); err != nil {
1079 return err
1080 }
1081
1082 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1083 name := name
1084 if name != "*" {
1085 // TODO: support WHO masks
1086 name = dc.marshalEntity(uc, name)
1087 }
1088 dc.SendMessage(&irc.Message{
1089 Prefix: dc.srv.prefix(),
1090 Command: irc.RPL_ENDOFWHO,
1091 Params: []string{dc.nick, name, "End of /WHO list"},
1092 })
1093 })
1094 case irc.RPL_WHOISUSER:
1095 var nick, username, host, realname string
1096 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
1097 return err
1098 }
1099
1100 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1101 nick := dc.marshalNick(uc, nick)
1102 dc.SendMessage(&irc.Message{
1103 Prefix: dc.srv.prefix(),
1104 Command: irc.RPL_WHOISUSER,
1105 Params: []string{dc.nick, nick, username, host, "*", realname},
1106 })
1107 })
1108 case irc.RPL_WHOISSERVER:
1109 var nick, server, serverInfo string
1110 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
1111 return err
1112 }
1113
1114 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1115 nick := dc.marshalNick(uc, nick)
1116 dc.SendMessage(&irc.Message{
1117 Prefix: dc.srv.prefix(),
1118 Command: irc.RPL_WHOISSERVER,
1119 Params: []string{dc.nick, nick, server, serverInfo},
1120 })
1121 })
1122 case irc.RPL_WHOISOPERATOR:
1123 var nick string
1124 if err := parseMessageParams(msg, nil, &nick); err != nil {
1125 return err
1126 }
1127
1128 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1129 nick := dc.marshalNick(uc, nick)
1130 dc.SendMessage(&irc.Message{
1131 Prefix: dc.srv.prefix(),
1132 Command: irc.RPL_WHOISOPERATOR,
1133 Params: []string{dc.nick, nick, "is an IRC operator"},
1134 })
1135 })
1136 case irc.RPL_WHOISIDLE:
1137 var nick string
1138 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1139 return err
1140 }
1141
1142 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1143 nick := dc.marshalNick(uc, nick)
1144 params := []string{dc.nick, nick}
1145 params = append(params, msg.Params[2:]...)
1146 dc.SendMessage(&irc.Message{
1147 Prefix: dc.srv.prefix(),
1148 Command: irc.RPL_WHOISIDLE,
1149 Params: params,
1150 })
1151 })
1152 case irc.RPL_WHOISCHANNELS:
1153 var nick, channelList string
1154 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1155 return err
1156 }
1157 channels := splitSpace(channelList)
1158
1159 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1160 nick := dc.marshalNick(uc, nick)
1161 channelList := make([]string, len(channels))
1162 for i, channel := range channels {
1163 prefix, channel := uc.parseMembershipPrefix(channel)
1164 channel = dc.marshalChannel(uc, channel)
1165 channelList[i] = prefix.String() + channel
1166 }
1167 channels := strings.Join(channelList, " ")
1168 dc.SendMessage(&irc.Message{
1169 Prefix: dc.srv.prefix(),
1170 Command: irc.RPL_WHOISCHANNELS,
1171 Params: []string{dc.nick, nick, channels},
1172 })
1173 })
1174 case irc.RPL_ENDOFWHOIS:
1175 var nick string
1176 if err := parseMessageParams(msg, nil, &nick); err != nil {
1177 return err
1178 }
1179
1180 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1181 nick := dc.marshalNick(uc, nick)
1182 dc.SendMessage(&irc.Message{
1183 Prefix: dc.srv.prefix(),
1184 Command: irc.RPL_ENDOFWHOIS,
1185 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1186 })
1187 })
1188 case "PRIVMSG":
1189 if msg.Prefix == nil {
1190 return fmt.Errorf("expected a prefix")
1191 }
1192
1193 var nick, text string
1194 if err := parseMessageParams(msg, &nick, &text); err != nil {
1195 return err
1196 }
1197
1198 if msg.Prefix.Name == serviceNick {
1199 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1200 break
1201 }
1202 if nick == serviceNick {
1203 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1204 break
1205 }
1206
1207 target := nick
1208 if nick == uc.nick {
1209 target = msg.Prefix.Name
1210 }
1211 uc.appendLog(target, "<%s> %s", msg.Prefix.Name, text)
1212
1213 uc.network.ring.Produce(msg)
1214 case "INVITE":
1215 var nick string
1216 var channel string
1217 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1218 return err
1219 }
1220
1221 uc.forEachDownstream(func(dc *downstreamConn) {
1222 dc.SendMessage(&irc.Message{
1223 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
1224 Command: "INVITE",
1225 Params: []string{dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1226 })
1227 })
1228 case irc.RPL_INVITING:
1229 var nick string
1230 var channel string
1231 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1232 return err
1233 }
1234
1235 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1236 dc.SendMessage(&irc.Message{
1237 Prefix: dc.srv.prefix(),
1238 Command: irc.RPL_INVITING,
1239 Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1240 })
1241 })
1242 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1243 var command, reason string
1244 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1245 return err
1246 }
1247
1248 if command == "LIST" {
1249 ok := uc.endPendingLISTs(false)
1250 if !ok {
1251 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1252 }
1253 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1254 dc.SendMessage(&irc.Message{
1255 Prefix: uc.srv.prefix(),
1256 Command: msg.Command,
1257 Params: []string{dc.nick, "LIST", reason},
1258 })
1259 })
1260 }
1261 case "TAGMSG":
1262 // TODO: relay to downstream connections that accept message-tags
1263 case "ACK":
1264 // Ignore
1265 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1266 // Ignore
1267 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1268 // Ignore
1269 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1270 // Ignore
1271 case irc.RPL_LISTSTART:
1272 // Ignore
1273 case rpl_localusers, rpl_globalusers:
1274 // Ignore
1275 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1276 // Ignore
1277 default:
1278 uc.logger.Printf("unhandled message: %v", msg)
1279 }
1280 return nil
1281}
1282
1283func splitSpace(s string) []string {
1284 return strings.FieldsFunc(s, func(r rune) bool {
1285 return r == ' '
1286 })
1287}
1288
1289func (uc *upstreamConn) register() {
1290 uc.nick = uc.network.Nick
1291 uc.username = uc.network.Username
1292 if uc.username == "" {
1293 uc.username = uc.nick
1294 }
1295 uc.realname = uc.network.Realname
1296 if uc.realname == "" {
1297 uc.realname = uc.nick
1298 }
1299
1300 uc.SendMessage(&irc.Message{
1301 Command: "CAP",
1302 Params: []string{"LS", "302"},
1303 })
1304
1305 if uc.network.Pass != "" {
1306 uc.SendMessage(&irc.Message{
1307 Command: "PASS",
1308 Params: []string{uc.network.Pass},
1309 })
1310 }
1311
1312 uc.SendMessage(&irc.Message{
1313 Command: "NICK",
1314 Params: []string{uc.nick},
1315 })
1316 uc.SendMessage(&irc.Message{
1317 Command: "USER",
1318 Params: []string{uc.username, "0", "*", uc.realname},
1319 })
1320}
1321
1322func (uc *upstreamConn) requestSASL() bool {
1323 if uc.network.SASL.Mechanism == "" {
1324 return false
1325 }
1326
1327 v, ok := uc.caps["sasl"]
1328 if !ok {
1329 return false
1330 }
1331 if v != "" {
1332 mechanisms := strings.Split(v, ",")
1333 found := false
1334 for _, mech := range mechanisms {
1335 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1336 found = true
1337 break
1338 }
1339 }
1340 if !found {
1341 return false
1342 }
1343 }
1344
1345 return true
1346}
1347
1348func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1349 auth := &uc.network.SASL
1350 switch name {
1351 case "sasl":
1352 if !ok {
1353 uc.logger.Printf("server refused to acknowledge the SASL capability")
1354 return nil
1355 }
1356
1357 switch auth.Mechanism {
1358 case "PLAIN":
1359 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1360 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1361 default:
1362 return fmt.Errorf("unsupported SASL mechanism %q", name)
1363 }
1364
1365 uc.SendMessage(&irc.Message{
1366 Command: "AUTHENTICATE",
1367 Params: []string{auth.Mechanism},
1368 })
1369 case "message-tags":
1370 uc.tagsSupported = ok
1371 case "labeled-response":
1372 uc.labelsSupported = ok
1373 }
1374 return nil
1375}
1376
1377func (uc *upstreamConn) readMessages(ch chan<- event) error {
1378 for {
1379 msg, err := uc.irc.ReadMessage()
1380 if err == io.EOF {
1381 break
1382 } else if err != nil {
1383 return fmt.Errorf("failed to read IRC command: %v", err)
1384 }
1385
1386 if uc.srv.Debug {
1387 uc.logger.Printf("received: %v", msg)
1388 }
1389
1390 ch <- eventUpstreamMessage{msg, uc}
1391 }
1392
1393 return nil
1394}
1395
1396// SendMessage queues a new outgoing message. It is safe to call from any
1397// goroutine.
1398func (uc *upstreamConn) SendMessage(msg *irc.Message) {
1399 uc.outgoing <- msg
1400}
1401
1402func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1403 if uc.labelsSupported {
1404 if msg.Tags == nil {
1405 msg.Tags = make(map[string]irc.TagValue)
1406 }
1407 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1408 uc.nextLabelID++
1409 }
1410 uc.SendMessage(msg)
1411}
1412
1413// TODO: handle moving logs when a network name changes, when support for this is added
1414func (uc *upstreamConn) appendLog(entity string, format string, a ...interface{}) {
1415 if uc.srv.LogPath == "" {
1416 return
1417 }
1418 // TODO: enforce maximum open file handles (LRU cache of file handles)
1419 // TODO: handle non-monotonic clock behaviour
1420 now := time.Now()
1421 year, month, day := now.Date()
1422 name := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
1423 log, ok := uc.logs[entity]
1424 if !ok || log.name != name {
1425 if ok {
1426 log.file.Close()
1427 delete(uc.logs, entity)
1428 }
1429 // TODO: handle/forbid network/entity names with illegal path characters
1430 dir := filepath.Join(uc.srv.LogPath, uc.user.Username, uc.network.Name, entity)
1431 if err := os.MkdirAll(dir, 0600); err != nil {
1432 uc.logger.Printf("failed to log message: could not create logs directory %q: %v", dir, err)
1433 return
1434 }
1435 path := filepath.Join(dir, name)
1436 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
1437 if err != nil {
1438 uc.logger.Printf("failed to log message: could not open or create log file %q: %v", path, err)
1439 return
1440 }
1441 log = entityLog{
1442 name: name,
1443 file: f,
1444 }
1445 uc.logs[entity] = log
1446 }
1447
1448 format = "[%02d:%02d:%02d] " + format + "\n"
1449 args := []interface{}{now.Hour(), now.Minute(), now.Second()}
1450 args = append(args, a...)
1451
1452 if _, err := fmt.Fprintf(log.file, format, args...); err != nil {
1453 uc.logger.Printf("failed to log message to %q: %v", log.name, err)
1454 }
1455}
Note: See TracBrowser for help on using the repository browser.