source: code/trunk/upstream.go@ 178

Last change on this file since 178 was 178, checked in by delthas, 5 years ago

Add support for bouncer logs

Add bouncer logs, in a network/channel/date.log format, in a similar
manner to ZNC log module. PRIVMSG, JOIN, PART, QUIT, MODE are logged.

Add a config directive for the logs file, including a way to disable
them entirely.

File size: 36.2 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "os"
11 "path/filepath"
12 "strconv"
13 "strings"
14 "time"
15
16 "github.com/emersion/go-sasl"
17 "gopkg.in/irc.v3"
18)
19
20type upstreamChannel struct {
21 Name string
22 conn *upstreamConn
23 Topic string
24 TopicWho string
25 TopicTime time.Time
26 Status channelStatus
27 modes channelModes
28 creationTime string
29 Members map[string]*membership
30 complete bool
31}
32
33type upstreamConn struct {
34 network *network
35 logger Logger
36 net net.Conn
37 irc *irc.Conn
38 srv *Server
39 user *user
40 outgoing chan<- *irc.Message
41 closed chan struct{}
42
43 serverName string
44 availableUserModes string
45 availableChannelModes map[byte]channelModeType
46 availableChannelTypes string
47 availableMemberships []membership
48
49 registered bool
50 nick string
51 username string
52 realname string
53 modes userModes
54 channels map[string]*upstreamChannel
55 caps map[string]string
56 batches map[string]batch
57
58 tagsSupported bool
59 labelsSupported bool
60 nextLabelID uint64
61
62 saslClient sasl.Client
63 saslStarted bool
64
65 // set of LIST commands in progress, per downstream
66 // access is synchronized with user.pendingLISTsLock
67 pendingLISTDownstreamSet map[uint64]struct{}
68
69 logs map[string]entityLog
70}
71
72type entityLog struct {
73 name string
74 file *os.File
75}
76
77func connectToUpstream(network *network) (*upstreamConn, error) {
78 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
79
80 addr := network.Addr
81 if !strings.ContainsRune(addr, ':') {
82 addr = addr + ":6697"
83 }
84
85 logger.Printf("connecting to TLS server at address %q", addr)
86 netConn, err := tls.Dial("tcp", addr, nil)
87 if err != nil {
88 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
89 }
90
91 setKeepAlive(netConn)
92
93 outgoing := make(chan *irc.Message, 64)
94 uc := &upstreamConn{
95 network: network,
96 logger: logger,
97 net: netConn,
98 irc: irc.NewConn(netConn),
99 srv: network.user.srv,
100 user: network.user,
101 outgoing: outgoing,
102 channels: make(map[string]*upstreamChannel),
103 caps: make(map[string]string),
104 batches: make(map[string]batch),
105 availableChannelTypes: stdChannelTypes,
106 availableChannelModes: stdChannelModes,
107 availableMemberships: stdMemberships,
108 pendingLISTDownstreamSet: make(map[uint64]struct{}),
109 logs: make(map[string]entityLog),
110 }
111
112 go func() {
113 for {
114 var closed bool
115 select {
116 case msg := <-outgoing:
117 if uc.srv.Debug {
118 uc.logger.Printf("sent: %v", msg)
119 }
120 if err := uc.irc.WriteMessage(msg); err != nil {
121 uc.logger.Printf("failed to write message: %v", err)
122 }
123 case <-uc.closed:
124 closed = true
125 }
126 if closed {
127 break
128 }
129 }
130 if err := uc.net.Close(); err != nil {
131 uc.logger.Printf("failed to close connection: %v", err)
132 } else {
133 uc.logger.Printf("connection closed")
134 }
135 }()
136
137 return uc, nil
138}
139
140func (uc *upstreamConn) isClosed() bool {
141 select {
142 case <-uc.closed:
143 return true
144 default:
145 return false
146 }
147}
148
149func (uc *upstreamConn) Close() error {
150 if uc.isClosed() {
151 return fmt.Errorf("upstream connection already closed")
152 }
153 close(uc.closed)
154 for _, log := range uc.logs {
155 log.file.Close()
156 }
157 uc.endPendingLists(true)
158 return nil
159}
160
161func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
162 uc.user.forEachDownstream(func(dc *downstreamConn) {
163 if dc.network != nil && dc.network != uc.network {
164 return
165 }
166 f(dc)
167 })
168}
169
170func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
171 uc.forEachDownstream(func(dc *downstreamConn) {
172 if id != 0 && id != dc.id {
173 return
174 }
175 f(dc)
176 })
177}
178
179func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
180 ch, ok := uc.channels[name]
181 if !ok {
182 return nil, fmt.Errorf("unknown channel %q", name)
183 }
184 return ch, nil
185}
186
187func (uc *upstreamConn) isChannel(entity string) bool {
188 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
189 return true
190 }
191 return false
192}
193
194func (uc *upstreamConn) getPendingList() *pendingLIST {
195 uc.user.pendingLISTsLock.Lock()
196 defer uc.user.pendingLISTsLock.Unlock()
197 for _, pl := range uc.user.pendingLISTs {
198 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
199 continue
200 }
201 return &pl
202 }
203 return nil
204}
205
206func (uc *upstreamConn) endPendingLists(all bool) (found bool) {
207 found = false
208 uc.user.pendingLISTsLock.Lock()
209 defer uc.user.pendingLISTsLock.Unlock()
210 for i := 0; i < len(uc.user.pendingLISTs); i++ {
211 pl := uc.user.pendingLISTs[i]
212 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
213 continue
214 }
215 delete(pl.pendingCommands, uc.network.ID)
216 if len(pl.pendingCommands) == 0 {
217 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
218 i--
219 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
220 dc.SendMessage(&irc.Message{
221 Prefix: dc.srv.prefix(),
222 Command: irc.RPL_LISTEND,
223 Params: []string{dc.nick, "End of /LIST"},
224 })
225 })
226 }
227 found = true
228 if !all {
229 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
230 uc.user.forEachUpstream(func(uc *upstreamConn) {
231 uc.trySendList(pl.downstreamID)
232 })
233 return
234 }
235 }
236 return
237}
238
239func (uc *upstreamConn) trySendList(downstreamID uint64) {
240 // must be called with a lock in uc.user.pendingLISTsLock
241
242 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
243 // a LIST command is already pending
244 // we will try again when that command is completed
245 return
246 }
247
248 for _, pl := range uc.user.pendingLISTs {
249 if pl.downstreamID != downstreamID {
250 continue
251 }
252 // this is the first pending LIST command list of the downstream
253 listCommand, ok := pl.pendingCommands[uc.network.ID]
254 if !ok {
255 // there is no command for this upstream in these LIST commands
256 // do not send anything
257 continue
258 }
259 // there is a command for this upstream in these LIST commands
260 // send it now
261
262 uc.SendMessageLabeled(downstreamID, listCommand)
263
264 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
265 return
266 }
267}
268
269func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
270 for _, m := range uc.availableMemberships {
271 if m.Prefix == s[0] {
272 return &m, s[1:]
273 }
274 }
275 return nil, s
276}
277
278func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
279 var label string
280 if l, ok := msg.GetTag("label"); ok {
281 label = l
282 }
283
284 var msgBatch *batch
285 if batchName, ok := msg.GetTag("batch"); ok {
286 b, ok := uc.batches[batchName]
287 if !ok {
288 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
289 }
290 msgBatch = &b
291 if label == "" {
292 label = msgBatch.Label
293 }
294 }
295
296 var downstreamID uint64 = 0
297 if label != "" {
298 var labelOffset uint64
299 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
300 if err == nil && n < 2 {
301 err = errors.New("not enough arguments")
302 }
303 if err != nil {
304 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
305 }
306 }
307
308 switch msg.Command {
309 case "PING":
310 uc.SendMessage(&irc.Message{
311 Command: "PONG",
312 Params: msg.Params,
313 })
314 return nil
315 case "NOTICE":
316 uc.logger.Print(msg)
317
318 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
319 uc.forEachDownstream(func(dc *downstreamConn) {
320 dc.SendMessage(msg)
321 })
322 } else { // regular user NOTICE
323 var nick, text string
324 if err := parseMessageParams(msg, &nick, &text); err != nil {
325 return err
326 }
327
328 target := nick
329 if nick == uc.nick {
330 target = msg.Prefix.Name
331 }
332 uc.AppendLog(target, "<%s> %s", msg.Prefix.Name, text)
333
334 uc.forEachDownstream(func(dc *downstreamConn) {
335 dc.SendMessage(&irc.Message{
336 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
337 Command: "NOTICE",
338 Params: []string{dc.marshalEntity(uc, nick), text},
339 })
340 })
341 }
342 case "CAP":
343 var subCmd string
344 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
345 return err
346 }
347 subCmd = strings.ToUpper(subCmd)
348 subParams := msg.Params[2:]
349 switch subCmd {
350 case "LS":
351 if len(subParams) < 1 {
352 return newNeedMoreParamsError(msg.Command)
353 }
354 caps := strings.Fields(subParams[len(subParams)-1])
355 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
356
357 for _, s := range caps {
358 kv := strings.SplitN(s, "=", 2)
359 k := strings.ToLower(kv[0])
360 var v string
361 if len(kv) == 2 {
362 v = kv[1]
363 }
364 uc.caps[k] = v
365 }
366
367 if more {
368 break // wait to receive all capabilities
369 }
370
371 requestCaps := make([]string, 0, 16)
372 for _, c := range []string{"message-tags", "batch", "labeled-response"} {
373 if _, ok := uc.caps[c]; ok {
374 requestCaps = append(requestCaps, c)
375 }
376 }
377
378 if uc.requestSASL() {
379 requestCaps = append(requestCaps, "sasl")
380 }
381
382 if len(requestCaps) > 0 {
383 uc.SendMessage(&irc.Message{
384 Command: "CAP",
385 Params: []string{"REQ", strings.Join(requestCaps, " ")},
386 })
387 }
388
389 if uc.requestSASL() {
390 break // we'll send CAP END after authentication is completed
391 }
392
393 uc.SendMessage(&irc.Message{
394 Command: "CAP",
395 Params: []string{"END"},
396 })
397 case "ACK", "NAK":
398 if len(subParams) < 1 {
399 return newNeedMoreParamsError(msg.Command)
400 }
401 caps := strings.Fields(subParams[0])
402
403 for _, name := range caps {
404 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
405 return err
406 }
407 }
408
409 if uc.saslClient == nil {
410 uc.SendMessage(&irc.Message{
411 Command: "CAP",
412 Params: []string{"END"},
413 })
414 }
415 default:
416 uc.logger.Printf("unhandled message: %v", msg)
417 }
418 case "AUTHENTICATE":
419 if uc.saslClient == nil {
420 return fmt.Errorf("received unexpected AUTHENTICATE message")
421 }
422
423 // TODO: if a challenge is 400 bytes long, buffer it
424 var challengeStr string
425 if err := parseMessageParams(msg, &challengeStr); err != nil {
426 uc.SendMessage(&irc.Message{
427 Command: "AUTHENTICATE",
428 Params: []string{"*"},
429 })
430 return err
431 }
432
433 var challenge []byte
434 if challengeStr != "+" {
435 var err error
436 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
437 if err != nil {
438 uc.SendMessage(&irc.Message{
439 Command: "AUTHENTICATE",
440 Params: []string{"*"},
441 })
442 return err
443 }
444 }
445
446 var resp []byte
447 var err error
448 if !uc.saslStarted {
449 _, resp, err = uc.saslClient.Start()
450 uc.saslStarted = true
451 } else {
452 resp, err = uc.saslClient.Next(challenge)
453 }
454 if err != nil {
455 uc.SendMessage(&irc.Message{
456 Command: "AUTHENTICATE",
457 Params: []string{"*"},
458 })
459 return err
460 }
461
462 // TODO: send response in multiple chunks if >= 400 bytes
463 var respStr = "+"
464 if resp != nil {
465 respStr = base64.StdEncoding.EncodeToString(resp)
466 }
467
468 uc.SendMessage(&irc.Message{
469 Command: "AUTHENTICATE",
470 Params: []string{respStr},
471 })
472 case irc.RPL_LOGGEDIN:
473 var account string
474 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
475 return err
476 }
477 uc.logger.Printf("logged in with account %q", account)
478 case irc.RPL_LOGGEDOUT:
479 uc.logger.Printf("logged out")
480 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
481 var info string
482 if err := parseMessageParams(msg, nil, &info); err != nil {
483 return err
484 }
485 switch msg.Command {
486 case irc.ERR_NICKLOCKED:
487 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
488 case irc.ERR_SASLFAIL:
489 uc.logger.Printf("SASL authentication failed: %v", info)
490 case irc.ERR_SASLTOOLONG:
491 uc.logger.Printf("SASL message too long: %v", info)
492 }
493
494 uc.saslClient = nil
495 uc.saslStarted = false
496
497 uc.SendMessage(&irc.Message{
498 Command: "CAP",
499 Params: []string{"END"},
500 })
501 case irc.RPL_WELCOME:
502 uc.registered = true
503 uc.logger.Printf("connection registered")
504
505 channels, err := uc.srv.db.ListChannels(uc.network.ID)
506 if err != nil {
507 uc.logger.Printf("failed to list channels from database: %v", err)
508 break
509 }
510
511 for _, ch := range channels {
512 params := []string{ch.Name}
513 if ch.Key != "" {
514 params = append(params, ch.Key)
515 }
516 uc.SendMessage(&irc.Message{
517 Command: "JOIN",
518 Params: params,
519 })
520 }
521 case irc.RPL_MYINFO:
522 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
523 return err
524 }
525 case irc.RPL_ISUPPORT:
526 if err := parseMessageParams(msg, nil, nil); err != nil {
527 return err
528 }
529 for _, token := range msg.Params[1 : len(msg.Params)-1] {
530 negate := false
531 parameter := token
532 value := ""
533 if strings.HasPrefix(token, "-") {
534 negate = true
535 token = token[1:]
536 } else {
537 if i := strings.IndexByte(token, '='); i >= 0 {
538 parameter = token[:i]
539 value = token[i+1:]
540 }
541 }
542 if !negate {
543 switch parameter {
544 case "CHANMODES":
545 parts := strings.SplitN(value, ",", 5)
546 if len(parts) < 4 {
547 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
548 }
549 modes := make(map[byte]channelModeType)
550 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
551 for j := 0; j < len(parts[i]); j++ {
552 mode := parts[i][j]
553 modes[mode] = mt
554 }
555 }
556 uc.availableChannelModes = modes
557 case "CHANTYPES":
558 uc.availableChannelTypes = value
559 case "PREFIX":
560 if value == "" {
561 uc.availableMemberships = nil
562 } else {
563 if value[0] != '(' {
564 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
565 }
566 sep := strings.IndexByte(value, ')')
567 if sep < 0 || len(value) != sep*2 {
568 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
569 }
570 memberships := make([]membership, len(value)/2-1)
571 for i := range memberships {
572 memberships[i] = membership{
573 Mode: value[i+1],
574 Prefix: value[sep+i+1],
575 }
576 }
577 uc.availableMemberships = memberships
578 }
579 }
580 } else {
581 // TODO: handle ISUPPORT negations
582 }
583 }
584 case "BATCH":
585 var tag string
586 if err := parseMessageParams(msg, &tag); err != nil {
587 return err
588 }
589
590 if strings.HasPrefix(tag, "+") {
591 tag = tag[1:]
592 if _, ok := uc.batches[tag]; ok {
593 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
594 }
595 var batchType string
596 if err := parseMessageParams(msg, nil, &batchType); err != nil {
597 return err
598 }
599 label := label
600 if label == "" && msgBatch != nil {
601 label = msgBatch.Label
602 }
603 uc.batches[tag] = batch{
604 Type: batchType,
605 Params: msg.Params[2:],
606 Outer: msgBatch,
607 Label: label,
608 }
609 } else if strings.HasPrefix(tag, "-") {
610 tag = tag[1:]
611 if _, ok := uc.batches[tag]; !ok {
612 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
613 }
614 delete(uc.batches, tag)
615 } else {
616 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
617 }
618 case "NICK":
619 if msg.Prefix == nil {
620 return fmt.Errorf("expected a prefix")
621 }
622
623 var newNick string
624 if err := parseMessageParams(msg, &newNick); err != nil {
625 return err
626 }
627
628 if msg.Prefix.Name == uc.nick {
629 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
630 uc.nick = newNick
631 }
632
633 for _, ch := range uc.channels {
634 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
635 delete(ch.Members, msg.Prefix.Name)
636 ch.Members[newNick] = membership
637 uc.AppendLog(ch.Name, "*** %s is now known as %s", msg.Prefix.Name, newNick)
638 }
639 }
640
641 if msg.Prefix.Name != uc.nick {
642 uc.forEachDownstream(func(dc *downstreamConn) {
643 dc.SendMessage(&irc.Message{
644 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
645 Command: "NICK",
646 Params: []string{newNick},
647 })
648 })
649 }
650 case "JOIN":
651 if msg.Prefix == nil {
652 return fmt.Errorf("expected a prefix")
653 }
654
655 var channels string
656 if err := parseMessageParams(msg, &channels); err != nil {
657 return err
658 }
659
660 for _, ch := range strings.Split(channels, ",") {
661 if msg.Prefix.Name == uc.nick {
662 uc.logger.Printf("joined channel %q", ch)
663 uc.channels[ch] = &upstreamChannel{
664 Name: ch,
665 conn: uc,
666 Members: make(map[string]*membership),
667 }
668
669 uc.SendMessage(&irc.Message{
670 Command: "MODE",
671 Params: []string{ch},
672 })
673 } else {
674 ch, err := uc.getChannel(ch)
675 if err != nil {
676 return err
677 }
678 ch.Members[msg.Prefix.Name] = nil
679 }
680
681 uc.AppendLog(ch, "*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
682
683 uc.forEachDownstream(func(dc *downstreamConn) {
684 dc.SendMessage(&irc.Message{
685 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
686 Command: "JOIN",
687 Params: []string{dc.marshalChannel(uc, ch)},
688 })
689 })
690 }
691 case "PART":
692 if msg.Prefix == nil {
693 return fmt.Errorf("expected a prefix")
694 }
695
696 var channels string
697 if err := parseMessageParams(msg, &channels); err != nil {
698 return err
699 }
700
701 var reason string
702 if len(msg.Params) > 1 {
703 reason = msg.Params[1]
704 }
705
706 for _, ch := range strings.Split(channels, ",") {
707 if msg.Prefix.Name == uc.nick {
708 uc.logger.Printf("parted channel %q", ch)
709 delete(uc.channels, ch)
710 } else {
711 ch, err := uc.getChannel(ch)
712 if err != nil {
713 return err
714 }
715 delete(ch.Members, msg.Prefix.Name)
716 }
717
718 uc.AppendLog(ch, "*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
719
720 uc.forEachDownstream(func(dc *downstreamConn) {
721 dc.SendMessage(&irc.Message{
722 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
723 Command: "PART",
724 Params: []string{dc.marshalChannel(uc, ch)},
725 })
726 })
727 }
728 case "KICK":
729 if msg.Prefix == nil {
730 return fmt.Errorf("expected a prefix")
731 }
732
733 var channel, user string
734 if err := parseMessageParams(msg, &channel, &user); err != nil {
735 return err
736 }
737
738 var reason string
739 if len(msg.Params) > 2 {
740 reason = msg.Params[1]
741 }
742
743 if user == uc.nick {
744 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
745 delete(uc.channels, channel)
746 } else {
747 ch, err := uc.getChannel(channel)
748 if err != nil {
749 return err
750 }
751 delete(ch.Members, user)
752 }
753
754 uc.AppendLog(channel, "*** %s was kicked by %s (%s)", user, msg.Prefix.Name, reason)
755
756 uc.forEachDownstream(func(dc *downstreamConn) {
757 params := []string{dc.marshalChannel(uc, channel), dc.marshalNick(uc, user)}
758 if reason != "" {
759 params = append(params, reason)
760 }
761 dc.SendMessage(&irc.Message{
762 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
763 Command: "KICK",
764 Params: params,
765 })
766 })
767 case "QUIT":
768 if msg.Prefix == nil {
769 return fmt.Errorf("expected a prefix")
770 }
771
772 var reason string
773 if len(msg.Params) > 0 {
774 reason = msg.Params[0]
775 }
776
777 if msg.Prefix.Name == uc.nick {
778 uc.logger.Printf("quit")
779 }
780
781 for _, ch := range uc.channels {
782 if _, ok := ch.Members[msg.Prefix.Name]; ok {
783 delete(ch.Members, msg.Prefix.Name)
784
785 uc.AppendLog(ch.Name, "*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
786 }
787 }
788
789 if msg.Prefix.Name != uc.nick {
790 uc.forEachDownstream(func(dc *downstreamConn) {
791 dc.SendMessage(&irc.Message{
792 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
793 Command: "QUIT",
794 Params: msg.Params,
795 })
796 })
797 }
798 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
799 var name, topic string
800 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
801 return err
802 }
803 ch, err := uc.getChannel(name)
804 if err != nil {
805 return err
806 }
807 if msg.Command == irc.RPL_TOPIC {
808 ch.Topic = topic
809 } else {
810 ch.Topic = ""
811 }
812 case "TOPIC":
813 var name string
814 if err := parseMessageParams(msg, &name); err != nil {
815 return err
816 }
817 ch, err := uc.getChannel(name)
818 if err != nil {
819 return err
820 }
821 if len(msg.Params) > 1 {
822 ch.Topic = msg.Params[1]
823 } else {
824 ch.Topic = ""
825 }
826 uc.forEachDownstream(func(dc *downstreamConn) {
827 params := []string{dc.marshalChannel(uc, name)}
828 if ch.Topic != "" {
829 params = append(params, ch.Topic)
830 }
831 dc.SendMessage(&irc.Message{
832 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
833 Command: "TOPIC",
834 Params: params,
835 })
836 })
837 case "MODE":
838 var name, modeStr string
839 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
840 return err
841 }
842
843 if !uc.isChannel(name) { // user mode change
844 if name != uc.nick {
845 return fmt.Errorf("received MODE message for unknown nick %q", name)
846 }
847 return uc.modes.Apply(modeStr)
848 // TODO: notify downstreams about user mode change?
849 } else { // channel mode change
850 ch, err := uc.getChannel(name)
851 if err != nil {
852 return err
853 }
854
855 if ch.modes != nil {
856 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
857 return err
858 }
859 }
860
861 modeMsg := modeStr
862 for _, v := range msg.Params[2:] {
863 modeMsg += " " + v
864 }
865 uc.AppendLog(ch.Name, "*** %s sets mode: %s", msg.Prefix.Name, modeMsg)
866
867 uc.forEachDownstream(func(dc *downstreamConn) {
868 params := []string{dc.marshalChannel(uc, name), modeStr}
869 params = append(params, msg.Params[2:]...)
870
871 dc.SendMessage(&irc.Message{
872 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
873 Command: "MODE",
874 Params: params,
875 })
876 })
877 }
878 case irc.RPL_UMODEIS:
879 if err := parseMessageParams(msg, nil); err != nil {
880 return err
881 }
882 modeStr := ""
883 if len(msg.Params) > 1 {
884 modeStr = msg.Params[1]
885 }
886
887 uc.modes = ""
888 if err := uc.modes.Apply(modeStr); err != nil {
889 return err
890 }
891 // TODO: send RPL_UMODEIS to downstream connections when applicable
892 case irc.RPL_CHANNELMODEIS:
893 var channel string
894 if err := parseMessageParams(msg, nil, &channel); err != nil {
895 return err
896 }
897 modeStr := ""
898 if len(msg.Params) > 2 {
899 modeStr = msg.Params[2]
900 }
901
902 ch, err := uc.getChannel(channel)
903 if err != nil {
904 return err
905 }
906
907 firstMode := ch.modes == nil
908 ch.modes = make(map[byte]string)
909 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
910 return err
911 }
912 if firstMode {
913 modeStr, modeParams := ch.modes.Format()
914
915 uc.forEachDownstream(func(dc *downstreamConn) {
916 params := []string{dc.nick, dc.marshalChannel(uc, channel), modeStr}
917 params = append(params, modeParams...)
918
919 dc.SendMessage(&irc.Message{
920 Prefix: dc.srv.prefix(),
921 Command: irc.RPL_CHANNELMODEIS,
922 Params: params,
923 })
924 })
925 }
926 case rpl_creationtime:
927 var channel, creationTime string
928 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
929 return err
930 }
931
932 ch, err := uc.getChannel(channel)
933 if err != nil {
934 return err
935 }
936
937 firstCreationTime := ch.creationTime == ""
938 ch.creationTime = creationTime
939 if firstCreationTime {
940 uc.forEachDownstream(func(dc *downstreamConn) {
941 dc.SendMessage(&irc.Message{
942 Prefix: dc.srv.prefix(),
943 Command: rpl_creationtime,
944 Params: []string{dc.nick, channel, creationTime},
945 })
946 })
947 }
948 case rpl_topicwhotime:
949 var name, who, timeStr string
950 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
951 return err
952 }
953 ch, err := uc.getChannel(name)
954 if err != nil {
955 return err
956 }
957 ch.TopicWho = who
958 sec, err := strconv.ParseInt(timeStr, 10, 64)
959 if err != nil {
960 return fmt.Errorf("failed to parse topic time: %v", err)
961 }
962 ch.TopicTime = time.Unix(sec, 0)
963 case irc.RPL_LIST:
964 var channel, clients, topic string
965 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
966 return err
967 }
968
969 pl := uc.getPendingList()
970 if pl == nil {
971 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
972 }
973
974 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
975 dc.SendMessage(&irc.Message{
976 Prefix: dc.srv.prefix(),
977 Command: irc.RPL_LIST,
978 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
979 })
980 })
981 case irc.RPL_LISTEND:
982 ok := uc.endPendingLists(false)
983 if !ok {
984 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
985 }
986 case irc.RPL_NAMREPLY:
987 var name, statusStr, members string
988 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
989 return err
990 }
991
992 ch, ok := uc.channels[name]
993 if !ok {
994 // NAMES on a channel we have not joined, forward to downstream
995 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
996 channel := dc.marshalChannel(uc, name)
997 members := splitSpace(members)
998 for i, member := range members {
999 membership, nick := uc.parseMembershipPrefix(member)
1000 members[i] = membership.String() + dc.marshalNick(uc, nick)
1001 }
1002 memberStr := strings.Join(members, " ")
1003
1004 dc.SendMessage(&irc.Message{
1005 Prefix: dc.srv.prefix(),
1006 Command: irc.RPL_NAMREPLY,
1007 Params: []string{dc.nick, statusStr, channel, memberStr},
1008 })
1009 })
1010 return nil
1011 }
1012
1013 status, err := parseChannelStatus(statusStr)
1014 if err != nil {
1015 return err
1016 }
1017 ch.Status = status
1018
1019 for _, s := range splitSpace(members) {
1020 membership, nick := uc.parseMembershipPrefix(s)
1021 ch.Members[nick] = membership
1022 }
1023 case irc.RPL_ENDOFNAMES:
1024 var name string
1025 if err := parseMessageParams(msg, nil, &name); err != nil {
1026 return err
1027 }
1028
1029 ch, ok := uc.channels[name]
1030 if !ok {
1031 // NAMES on a channel we have not joined, forward to downstream
1032 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1033 channel := dc.marshalChannel(uc, name)
1034
1035 dc.SendMessage(&irc.Message{
1036 Prefix: dc.srv.prefix(),
1037 Command: irc.RPL_ENDOFNAMES,
1038 Params: []string{dc.nick, channel, "End of /NAMES list"},
1039 })
1040 })
1041 return nil
1042 }
1043
1044 if ch.complete {
1045 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
1046 }
1047 ch.complete = true
1048
1049 uc.forEachDownstream(func(dc *downstreamConn) {
1050 forwardChannel(dc, ch)
1051 })
1052 case irc.RPL_WHOREPLY:
1053 var channel, username, host, server, nick, mode, trailing string
1054 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
1055 return err
1056 }
1057
1058 parts := strings.SplitN(trailing, " ", 2)
1059 if len(parts) != 2 {
1060 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
1061 }
1062 realname := parts[1]
1063 hops, err := strconv.Atoi(parts[0])
1064 if err != nil {
1065 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
1066 }
1067 hops++
1068
1069 trailing = strconv.Itoa(hops) + " " + realname
1070
1071 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1072 channel := channel
1073 if channel != "*" {
1074 channel = dc.marshalChannel(uc, channel)
1075 }
1076 nick := dc.marshalNick(uc, nick)
1077 dc.SendMessage(&irc.Message{
1078 Prefix: dc.srv.prefix(),
1079 Command: irc.RPL_WHOREPLY,
1080 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
1081 })
1082 })
1083 case irc.RPL_ENDOFWHO:
1084 var name string
1085 if err := parseMessageParams(msg, nil, &name); err != nil {
1086 return err
1087 }
1088
1089 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1090 name := name
1091 if name != "*" {
1092 // TODO: support WHO masks
1093 name = dc.marshalEntity(uc, name)
1094 }
1095 dc.SendMessage(&irc.Message{
1096 Prefix: dc.srv.prefix(),
1097 Command: irc.RPL_ENDOFWHO,
1098 Params: []string{dc.nick, name, "End of /WHO list"},
1099 })
1100 })
1101 case irc.RPL_WHOISUSER:
1102 var nick, username, host, realname string
1103 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
1104 return err
1105 }
1106
1107 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1108 nick := dc.marshalNick(uc, nick)
1109 dc.SendMessage(&irc.Message{
1110 Prefix: dc.srv.prefix(),
1111 Command: irc.RPL_WHOISUSER,
1112 Params: []string{dc.nick, nick, username, host, "*", realname},
1113 })
1114 })
1115 case irc.RPL_WHOISSERVER:
1116 var nick, server, serverInfo string
1117 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
1118 return err
1119 }
1120
1121 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1122 nick := dc.marshalNick(uc, nick)
1123 dc.SendMessage(&irc.Message{
1124 Prefix: dc.srv.prefix(),
1125 Command: irc.RPL_WHOISSERVER,
1126 Params: []string{dc.nick, nick, server, serverInfo},
1127 })
1128 })
1129 case irc.RPL_WHOISOPERATOR:
1130 var nick string
1131 if err := parseMessageParams(msg, nil, &nick); err != nil {
1132 return err
1133 }
1134
1135 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1136 nick := dc.marshalNick(uc, nick)
1137 dc.SendMessage(&irc.Message{
1138 Prefix: dc.srv.prefix(),
1139 Command: irc.RPL_WHOISOPERATOR,
1140 Params: []string{dc.nick, nick, "is an IRC operator"},
1141 })
1142 })
1143 case irc.RPL_WHOISIDLE:
1144 var nick string
1145 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1146 return err
1147 }
1148
1149 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1150 nick := dc.marshalNick(uc, nick)
1151 params := []string{dc.nick, nick}
1152 params = append(params, msg.Params[2:]...)
1153 dc.SendMessage(&irc.Message{
1154 Prefix: dc.srv.prefix(),
1155 Command: irc.RPL_WHOISIDLE,
1156 Params: params,
1157 })
1158 })
1159 case irc.RPL_WHOISCHANNELS:
1160 var nick, channelList string
1161 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1162 return err
1163 }
1164 channels := splitSpace(channelList)
1165
1166 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1167 nick := dc.marshalNick(uc, nick)
1168 channelList := make([]string, len(channels))
1169 for i, channel := range channels {
1170 prefix, channel := uc.parseMembershipPrefix(channel)
1171 channel = dc.marshalChannel(uc, channel)
1172 channelList[i] = prefix.String() + channel
1173 }
1174 channels := strings.Join(channelList, " ")
1175 dc.SendMessage(&irc.Message{
1176 Prefix: dc.srv.prefix(),
1177 Command: irc.RPL_WHOISCHANNELS,
1178 Params: []string{dc.nick, nick, channels},
1179 })
1180 })
1181 case irc.RPL_ENDOFWHOIS:
1182 var nick string
1183 if err := parseMessageParams(msg, nil, &nick); err != nil {
1184 return err
1185 }
1186
1187 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1188 nick := dc.marshalNick(uc, nick)
1189 dc.SendMessage(&irc.Message{
1190 Prefix: dc.srv.prefix(),
1191 Command: irc.RPL_ENDOFWHOIS,
1192 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1193 })
1194 })
1195 case "PRIVMSG":
1196 if msg.Prefix == nil {
1197 return fmt.Errorf("expected a prefix")
1198 }
1199
1200 var nick, text string
1201 if err := parseMessageParams(msg, &nick, &text); err != nil {
1202 return err
1203 }
1204
1205 if msg.Prefix.Name == serviceNick {
1206 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1207 break
1208 }
1209 if nick == serviceNick {
1210 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1211 break
1212 }
1213
1214 target := nick
1215 if nick == uc.nick {
1216 target = msg.Prefix.Name
1217 }
1218 uc.AppendLog(target, "<%s> %s", msg.Prefix.Name, text)
1219
1220 uc.network.ring.Produce(msg)
1221 case "INVITE":
1222 var nick string
1223 var channel string
1224 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1225 return err
1226 }
1227
1228 uc.forEachDownstream(func(dc *downstreamConn) {
1229 dc.SendMessage(&irc.Message{
1230 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
1231 Command: "INVITE",
1232 Params: []string{dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1233 })
1234 })
1235 case irc.RPL_INVITING:
1236 var nick string
1237 var channel string
1238 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1239 return err
1240 }
1241
1242 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1243 dc.SendMessage(&irc.Message{
1244 Prefix: dc.srv.prefix(),
1245 Command: irc.RPL_INVITING,
1246 Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1247 })
1248 })
1249 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1250 var command, reason string
1251 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1252 return err
1253 }
1254
1255 if command == "LIST" {
1256 ok := uc.endPendingLists(false)
1257 if !ok {
1258 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1259 }
1260 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1261 dc.SendMessage(&irc.Message{
1262 Prefix: uc.srv.prefix(),
1263 Command: msg.Command,
1264 Params: []string{dc.nick, "LIST", reason},
1265 })
1266 })
1267 }
1268 case "TAGMSG":
1269 // TODO: relay to downstream connections that accept message-tags
1270 case "ACK":
1271 // Ignore
1272 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1273 // Ignore
1274 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1275 // Ignore
1276 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1277 // Ignore
1278 case irc.RPL_LISTSTART:
1279 // Ignore
1280 case rpl_localusers, rpl_globalusers:
1281 // Ignore
1282 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1283 // Ignore
1284 default:
1285 uc.logger.Printf("unhandled message: %v", msg)
1286 }
1287 return nil
1288}
1289
1290func splitSpace(s string) []string {
1291 return strings.FieldsFunc(s, func(r rune) bool {
1292 return r == ' '
1293 })
1294}
1295
1296func (uc *upstreamConn) register() {
1297 uc.nick = uc.network.Nick
1298 uc.username = uc.network.Username
1299 if uc.username == "" {
1300 uc.username = uc.nick
1301 }
1302 uc.realname = uc.network.Realname
1303 if uc.realname == "" {
1304 uc.realname = uc.nick
1305 }
1306
1307 uc.SendMessage(&irc.Message{
1308 Command: "CAP",
1309 Params: []string{"LS", "302"},
1310 })
1311
1312 if uc.network.Pass != "" {
1313 uc.SendMessage(&irc.Message{
1314 Command: "PASS",
1315 Params: []string{uc.network.Pass},
1316 })
1317 }
1318
1319 uc.SendMessage(&irc.Message{
1320 Command: "NICK",
1321 Params: []string{uc.nick},
1322 })
1323 uc.SendMessage(&irc.Message{
1324 Command: "USER",
1325 Params: []string{uc.username, "0", "*", uc.realname},
1326 })
1327}
1328
1329func (uc *upstreamConn) requestSASL() bool {
1330 if uc.network.SASL.Mechanism == "" {
1331 return false
1332 }
1333
1334 v, ok := uc.caps["sasl"]
1335 if !ok {
1336 return false
1337 }
1338 if v != "" {
1339 mechanisms := strings.Split(v, ",")
1340 found := false
1341 for _, mech := range mechanisms {
1342 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1343 found = true
1344 break
1345 }
1346 }
1347 if !found {
1348 return false
1349 }
1350 }
1351
1352 return true
1353}
1354
1355func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1356 auth := &uc.network.SASL
1357 switch name {
1358 case "sasl":
1359 if !ok {
1360 uc.logger.Printf("server refused to acknowledge the SASL capability")
1361 return nil
1362 }
1363
1364 switch auth.Mechanism {
1365 case "PLAIN":
1366 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1367 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1368 default:
1369 return fmt.Errorf("unsupported SASL mechanism %q", name)
1370 }
1371
1372 uc.SendMessage(&irc.Message{
1373 Command: "AUTHENTICATE",
1374 Params: []string{auth.Mechanism},
1375 })
1376 case "message-tags":
1377 uc.tagsSupported = ok
1378 case "labeled-response":
1379 uc.labelsSupported = ok
1380 }
1381 return nil
1382}
1383
1384func (uc *upstreamConn) readMessages(ch chan<- event) error {
1385 for {
1386 msg, err := uc.irc.ReadMessage()
1387 if err == io.EOF {
1388 break
1389 } else if err != nil {
1390 return fmt.Errorf("failed to read IRC command: %v", err)
1391 }
1392
1393 if uc.srv.Debug {
1394 uc.logger.Printf("received: %v", msg)
1395 }
1396
1397 ch <- eventUpstreamMessage{msg, uc}
1398 }
1399
1400 return nil
1401}
1402
1403func (uc *upstreamConn) SendMessage(msg *irc.Message) {
1404 uc.outgoing <- msg
1405}
1406
1407func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1408 if uc.labelsSupported {
1409 if msg.Tags == nil {
1410 msg.Tags = make(map[string]irc.TagValue)
1411 }
1412 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1413 uc.nextLabelID++
1414 }
1415 uc.SendMessage(msg)
1416}
1417
1418// TODO: handle moving logs when a network name changes, when support for this is added
1419func (uc *upstreamConn) AppendLog(entity string, format string, a ...interface{}) {
1420 if uc.srv.LogPath == "" {
1421 return
1422 }
1423 // TODO: enforce maximum open file handles (LRU cache of file handles)
1424 // TODO: handle non-monotonic clock behaviour
1425 now := time.Now()
1426 year, month, day := now.Date()
1427 name := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
1428 log, ok := uc.logs[entity]
1429 if !ok || log.name != name {
1430 if ok {
1431 log.file.Close()
1432 delete(uc.logs, entity)
1433 }
1434 // TODO: handle/forbid network/entity names with illegal path characters
1435 dir := filepath.Join(uc.srv.LogPath, uc.user.Username, uc.network.Name, entity)
1436 if err := os.MkdirAll(dir, 0600); err != nil {
1437 uc.logger.Printf("failed to log message: could not create logs directory %q: %v", dir, err)
1438 return
1439 }
1440 path := filepath.Join(dir, name)
1441 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
1442 if err != nil {
1443 uc.logger.Printf("failed to log message: could not open or create log file %q: %v", path, err)
1444 return
1445 }
1446 log = entityLog{
1447 name: name,
1448 file: f,
1449 }
1450 uc.logs[entity] = log
1451 }
1452
1453 format = "[%02d:%02d:%02d] " + format + "\n"
1454 args := []interface{}{now.Hour(), now.Minute(), now.Second()}
1455 args = append(args, a...)
1456
1457 if _, err := fmt.Fprintf(log.file, format, args...); err != nil {
1458 uc.logger.Printf("failed to log message to %q: %v", log.name, err)
1459 }
1460}
Note: See TracBrowser for help on using the repository browser.