source: code/trunk/upstream.go@ 177

Last change on this file since 177 was 177, checked in by delthas, 5 years ago

Add LIST support

This commit adds support for downstream LIST messages from multiple
concurrent downstreams to multiple concurrent upstreams, including
support for multiple pending LIST requests from the same downstream.

Because a unique RPL_LISTEND message must be sent to the requesting
downstream, and that there might be multiple upstreams, each sending
their own RPL_LISTEND, a cache of RPL_LISTEND replies of some sort is
required to match RPL_LISTEND together in order to only send one back
downstream.

This commit adds a list of "pending LIST" structs, which each contain a
map of all upstreams that yet need to send a RPL_LISTEND, and the
corresponding LIST request associated with that response. This list of
pending LISTs is sorted according to the order that the requesting
downstreams sent the LIST messages in. Each pending set also stores the
id of the requesting downstream, in order to only forward the replies to
it and no other downstream. (This is important because LIST replies can
typically amount to several thousands messages on large servers.)

When a single downstream makes multiple LIST requests, only the first
one will be immediately sent to the upstream servers. The next ones will
be buffered until the first one is completed. Distinct downstreams can
make concurrent LIST requests without any request buffering.

Each RPL_LIST message is forwarded to the downstream of the first
matching pending LIST struct.

When an upstream sends an RPL_LISTEND message, the upstream is removed
from the first matching pending LIST struct, but that message is not
immediately forwarded downstream. If there are no remaining pending LIST
requests in that struct is then empty, that means all upstreams have
sent back all their RPL_LISTEND replies (which means they also sent all
their RPL_LIST replies); so a unique RPL_LISTEND is sent to downstream
and that pending LIST set is removed from the cache.

Upstreams are removed from the pending LIST structs in two other cases:

  • when they are closed (to avoid stalling because of a disconnected

upstream that will never reply to the LIST message): they are removed
from all pending LIST structs

  • when they reply with an ERR_UNKNOWNCOMMAND or RPL_TRYAGAIN LIST reply,

which is typically used when a user is not allowed to LIST because they
just joined the server: they are removed from the first pending LIST
struct, as if an RPL_LISTEND message was received

File size: 33.4 KB
Line 
1package soju
2
3import (
4 "crypto/tls"
5 "encoding/base64"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/emersion/go-sasl"
15 "gopkg.in/irc.v3"
16)
17
18type upstreamChannel struct {
19 Name string
20 conn *upstreamConn
21 Topic string
22 TopicWho string
23 TopicTime time.Time
24 Status channelStatus
25 modes channelModes
26 creationTime string
27 Members map[string]*membership
28 complete bool
29}
30
31type upstreamConn struct {
32 network *network
33 logger Logger
34 net net.Conn
35 irc *irc.Conn
36 srv *Server
37 user *user
38 outgoing chan<- *irc.Message
39 closed chan struct{}
40
41 serverName string
42 availableUserModes string
43 availableChannelModes map[byte]channelModeType
44 availableChannelTypes string
45 availableMemberships []membership
46
47 registered bool
48 nick string
49 username string
50 realname string
51 modes userModes
52 channels map[string]*upstreamChannel
53 caps map[string]string
54 batches map[string]batch
55
56 tagsSupported bool
57 labelsSupported bool
58 nextLabelID uint64
59
60 saslClient sasl.Client
61 saslStarted bool
62
63 // set of LIST commands in progress, per downstream
64 // access is synchronized with user.pendingLISTsLock
65 pendingLISTDownstreamSet map[uint64]struct{}
66}
67
68func connectToUpstream(network *network) (*upstreamConn, error) {
69 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
70
71 addr := network.Addr
72 if !strings.ContainsRune(addr, ':') {
73 addr = addr + ":6697"
74 }
75
76 logger.Printf("connecting to TLS server at address %q", addr)
77 netConn, err := tls.Dial("tcp", addr, nil)
78 if err != nil {
79 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
80 }
81
82 setKeepAlive(netConn)
83
84 outgoing := make(chan *irc.Message, 64)
85 uc := &upstreamConn{
86 network: network,
87 logger: logger,
88 net: netConn,
89 irc: irc.NewConn(netConn),
90 srv: network.user.srv,
91 user: network.user,
92 outgoing: outgoing,
93 channels: make(map[string]*upstreamChannel),
94 caps: make(map[string]string),
95 batches: make(map[string]batch),
96 availableChannelTypes: stdChannelTypes,
97 availableChannelModes: stdChannelModes,
98 availableMemberships: stdMemberships,
99 pendingLISTDownstreamSet: make(map[uint64]struct{}),
100 }
101
102 go func() {
103 for {
104 var closed bool
105 select {
106 case msg := <-outgoing:
107 if uc.srv.Debug {
108 uc.logger.Printf("sent: %v", msg)
109 }
110 if err := uc.irc.WriteMessage(msg); err != nil {
111 uc.logger.Printf("failed to write message: %v", err)
112 }
113 case <-uc.closed:
114 closed = true
115 }
116 if closed {
117 break
118 }
119 }
120 if err := uc.net.Close(); err != nil {
121 uc.logger.Printf("failed to close connection: %v", err)
122 } else {
123 uc.logger.Printf("connection closed")
124 }
125 }()
126
127 return uc, nil
128}
129
130func (uc *upstreamConn) isClosed() bool {
131 select {
132 case <-uc.closed:
133 return true
134 default:
135 return false
136 }
137}
138
139func (uc *upstreamConn) Close() error {
140 if uc.isClosed() {
141 return fmt.Errorf("upstream connection already closed")
142 }
143 close(uc.closed)
144
145 uc.endPendingLists(true)
146 return nil
147}
148
149func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
150 uc.user.forEachDownstream(func(dc *downstreamConn) {
151 if dc.network != nil && dc.network != uc.network {
152 return
153 }
154 f(dc)
155 })
156}
157
158func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
159 uc.forEachDownstream(func(dc *downstreamConn) {
160 if id != 0 && id != dc.id {
161 return
162 }
163 f(dc)
164 })
165}
166
167func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
168 ch, ok := uc.channels[name]
169 if !ok {
170 return nil, fmt.Errorf("unknown channel %q", name)
171 }
172 return ch, nil
173}
174
175func (uc *upstreamConn) isChannel(entity string) bool {
176 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
177 return true
178 }
179 return false
180}
181
182func (uc *upstreamConn) getPendingList() *pendingLIST {
183 uc.user.pendingLISTsLock.Lock()
184 defer uc.user.pendingLISTsLock.Unlock()
185 for _, pl := range uc.user.pendingLISTs {
186 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
187 continue
188 }
189 return &pl
190 }
191 return nil
192}
193
194func (uc *upstreamConn) endPendingLists(all bool) (found bool) {
195 found = false
196 uc.user.pendingLISTsLock.Lock()
197 defer uc.user.pendingLISTsLock.Unlock()
198 for i := 0; i < len(uc.user.pendingLISTs); i++ {
199 pl := uc.user.pendingLISTs[i]
200 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
201 continue
202 }
203 delete(pl.pendingCommands, uc.network.ID)
204 if len(pl.pendingCommands) == 0 {
205 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
206 i--
207 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
208 dc.SendMessage(&irc.Message{
209 Prefix: dc.srv.prefix(),
210 Command: irc.RPL_LISTEND,
211 Params: []string{dc.nick, "End of /LIST"},
212 })
213 })
214 }
215 found = true
216 if !all {
217 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
218 uc.user.forEachUpstream(func(uc *upstreamConn) {
219 uc.trySendList(pl.downstreamID)
220 })
221 return
222 }
223 }
224 return
225}
226
227func (uc *upstreamConn) trySendList(downstreamID uint64) {
228 // must be called with a lock in uc.user.pendingLISTsLock
229
230 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
231 // a LIST command is already pending
232 // we will try again when that command is completed
233 return
234 }
235
236 for _, pl := range uc.user.pendingLISTs {
237 if pl.downstreamID != downstreamID {
238 continue
239 }
240 // this is the first pending LIST command list of the downstream
241 listCommand, ok := pl.pendingCommands[uc.network.ID]
242 if !ok {
243 // there is no command for this upstream in these LIST commands
244 // do not send anything
245 continue
246 }
247 // there is a command for this upstream in these LIST commands
248 // send it now
249
250 uc.SendMessageLabeled(downstreamID, listCommand)
251
252 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
253 return
254 }
255}
256
257func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
258 for _, m := range uc.availableMemberships {
259 if m.Prefix == s[0] {
260 return &m, s[1:]
261 }
262 }
263 return nil, s
264}
265
266func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
267 var label string
268 if l, ok := msg.GetTag("label"); ok {
269 label = l
270 }
271
272 var msgBatch *batch
273 if batchName, ok := msg.GetTag("batch"); ok {
274 b, ok := uc.batches[batchName]
275 if !ok {
276 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
277 }
278 msgBatch = &b
279 if label == "" {
280 label = msgBatch.Label
281 }
282 }
283
284 var downstreamID uint64 = 0
285 if label != "" {
286 var labelOffset uint64
287 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
288 if err == nil && n < 2 {
289 err = errors.New("not enough arguments")
290 }
291 if err != nil {
292 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
293 }
294 }
295
296 switch msg.Command {
297 case "PING":
298 uc.SendMessage(&irc.Message{
299 Command: "PONG",
300 Params: msg.Params,
301 })
302 return nil
303 case "NOTICE":
304 uc.logger.Print(msg)
305
306 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
307 uc.forEachDownstream(func(dc *downstreamConn) {
308 dc.SendMessage(msg)
309 })
310 } else { // regular user NOTICE
311 var nick, text string
312 if err := parseMessageParams(msg, &nick, &text); err != nil {
313 return err
314 }
315
316 uc.forEachDownstream(func(dc *downstreamConn) {
317 dc.SendMessage(&irc.Message{
318 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
319 Command: "NOTICE",
320 Params: []string{dc.marshalEntity(uc, nick), text},
321 })
322 })
323 }
324 case "CAP":
325 var subCmd string
326 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
327 return err
328 }
329 subCmd = strings.ToUpper(subCmd)
330 subParams := msg.Params[2:]
331 switch subCmd {
332 case "LS":
333 if len(subParams) < 1 {
334 return newNeedMoreParamsError(msg.Command)
335 }
336 caps := strings.Fields(subParams[len(subParams)-1])
337 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
338
339 for _, s := range caps {
340 kv := strings.SplitN(s, "=", 2)
341 k := strings.ToLower(kv[0])
342 var v string
343 if len(kv) == 2 {
344 v = kv[1]
345 }
346 uc.caps[k] = v
347 }
348
349 if more {
350 break // wait to receive all capabilities
351 }
352
353 requestCaps := make([]string, 0, 16)
354 for _, c := range []string{"message-tags", "batch", "labeled-response"} {
355 if _, ok := uc.caps[c]; ok {
356 requestCaps = append(requestCaps, c)
357 }
358 }
359
360 if uc.requestSASL() {
361 requestCaps = append(requestCaps, "sasl")
362 }
363
364 if len(requestCaps) > 0 {
365 uc.SendMessage(&irc.Message{
366 Command: "CAP",
367 Params: []string{"REQ", strings.Join(requestCaps, " ")},
368 })
369 }
370
371 if uc.requestSASL() {
372 break // we'll send CAP END after authentication is completed
373 }
374
375 uc.SendMessage(&irc.Message{
376 Command: "CAP",
377 Params: []string{"END"},
378 })
379 case "ACK", "NAK":
380 if len(subParams) < 1 {
381 return newNeedMoreParamsError(msg.Command)
382 }
383 caps := strings.Fields(subParams[0])
384
385 for _, name := range caps {
386 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
387 return err
388 }
389 }
390
391 if uc.saslClient == nil {
392 uc.SendMessage(&irc.Message{
393 Command: "CAP",
394 Params: []string{"END"},
395 })
396 }
397 default:
398 uc.logger.Printf("unhandled message: %v", msg)
399 }
400 case "AUTHENTICATE":
401 if uc.saslClient == nil {
402 return fmt.Errorf("received unexpected AUTHENTICATE message")
403 }
404
405 // TODO: if a challenge is 400 bytes long, buffer it
406 var challengeStr string
407 if err := parseMessageParams(msg, &challengeStr); err != nil {
408 uc.SendMessage(&irc.Message{
409 Command: "AUTHENTICATE",
410 Params: []string{"*"},
411 })
412 return err
413 }
414
415 var challenge []byte
416 if challengeStr != "+" {
417 var err error
418 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
419 if err != nil {
420 uc.SendMessage(&irc.Message{
421 Command: "AUTHENTICATE",
422 Params: []string{"*"},
423 })
424 return err
425 }
426 }
427
428 var resp []byte
429 var err error
430 if !uc.saslStarted {
431 _, resp, err = uc.saslClient.Start()
432 uc.saslStarted = true
433 } else {
434 resp, err = uc.saslClient.Next(challenge)
435 }
436 if err != nil {
437 uc.SendMessage(&irc.Message{
438 Command: "AUTHENTICATE",
439 Params: []string{"*"},
440 })
441 return err
442 }
443
444 // TODO: send response in multiple chunks if >= 400 bytes
445 var respStr = "+"
446 if resp != nil {
447 respStr = base64.StdEncoding.EncodeToString(resp)
448 }
449
450 uc.SendMessage(&irc.Message{
451 Command: "AUTHENTICATE",
452 Params: []string{respStr},
453 })
454 case irc.RPL_LOGGEDIN:
455 var account string
456 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
457 return err
458 }
459 uc.logger.Printf("logged in with account %q", account)
460 case irc.RPL_LOGGEDOUT:
461 uc.logger.Printf("logged out")
462 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
463 var info string
464 if err := parseMessageParams(msg, nil, &info); err != nil {
465 return err
466 }
467 switch msg.Command {
468 case irc.ERR_NICKLOCKED:
469 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
470 case irc.ERR_SASLFAIL:
471 uc.logger.Printf("SASL authentication failed: %v", info)
472 case irc.ERR_SASLTOOLONG:
473 uc.logger.Printf("SASL message too long: %v", info)
474 }
475
476 uc.saslClient = nil
477 uc.saslStarted = false
478
479 uc.SendMessage(&irc.Message{
480 Command: "CAP",
481 Params: []string{"END"},
482 })
483 case irc.RPL_WELCOME:
484 uc.registered = true
485 uc.logger.Printf("connection registered")
486
487 channels, err := uc.srv.db.ListChannels(uc.network.ID)
488 if err != nil {
489 uc.logger.Printf("failed to list channels from database: %v", err)
490 break
491 }
492
493 for _, ch := range channels {
494 params := []string{ch.Name}
495 if ch.Key != "" {
496 params = append(params, ch.Key)
497 }
498 uc.SendMessage(&irc.Message{
499 Command: "JOIN",
500 Params: params,
501 })
502 }
503 case irc.RPL_MYINFO:
504 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
505 return err
506 }
507 case irc.RPL_ISUPPORT:
508 if err := parseMessageParams(msg, nil, nil); err != nil {
509 return err
510 }
511 for _, token := range msg.Params[1 : len(msg.Params)-1] {
512 negate := false
513 parameter := token
514 value := ""
515 if strings.HasPrefix(token, "-") {
516 negate = true
517 token = token[1:]
518 } else {
519 if i := strings.IndexByte(token, '='); i >= 0 {
520 parameter = token[:i]
521 value = token[i+1:]
522 }
523 }
524 if !negate {
525 switch parameter {
526 case "CHANMODES":
527 parts := strings.SplitN(value, ",", 5)
528 if len(parts) < 4 {
529 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
530 }
531 modes := make(map[byte]channelModeType)
532 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
533 for j := 0; j < len(parts[i]); j++ {
534 mode := parts[i][j]
535 modes[mode] = mt
536 }
537 }
538 uc.availableChannelModes = modes
539 case "CHANTYPES":
540 uc.availableChannelTypes = value
541 case "PREFIX":
542 if value == "" {
543 uc.availableMemberships = nil
544 } else {
545 if value[0] != '(' {
546 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
547 }
548 sep := strings.IndexByte(value, ')')
549 if sep < 0 || len(value) != sep*2 {
550 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
551 }
552 memberships := make([]membership, len(value)/2-1)
553 for i := range memberships {
554 memberships[i] = membership{
555 Mode: value[i+1],
556 Prefix: value[sep+i+1],
557 }
558 }
559 uc.availableMemberships = memberships
560 }
561 }
562 } else {
563 // TODO: handle ISUPPORT negations
564 }
565 }
566 case "BATCH":
567 var tag string
568 if err := parseMessageParams(msg, &tag); err != nil {
569 return err
570 }
571
572 if strings.HasPrefix(tag, "+") {
573 tag = tag[1:]
574 if _, ok := uc.batches[tag]; ok {
575 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
576 }
577 var batchType string
578 if err := parseMessageParams(msg, nil, &batchType); err != nil {
579 return err
580 }
581 label := label
582 if label == "" && msgBatch != nil {
583 label = msgBatch.Label
584 }
585 uc.batches[tag] = batch{
586 Type: batchType,
587 Params: msg.Params[2:],
588 Outer: msgBatch,
589 Label: label,
590 }
591 } else if strings.HasPrefix(tag, "-") {
592 tag = tag[1:]
593 if _, ok := uc.batches[tag]; !ok {
594 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
595 }
596 delete(uc.batches, tag)
597 } else {
598 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
599 }
600 case "NICK":
601 if msg.Prefix == nil {
602 return fmt.Errorf("expected a prefix")
603 }
604
605 var newNick string
606 if err := parseMessageParams(msg, &newNick); err != nil {
607 return err
608 }
609
610 if msg.Prefix.Name == uc.nick {
611 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
612 uc.nick = newNick
613 }
614
615 for _, ch := range uc.channels {
616 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
617 delete(ch.Members, msg.Prefix.Name)
618 ch.Members[newNick] = membership
619 }
620 }
621
622 if msg.Prefix.Name != uc.nick {
623 uc.forEachDownstream(func(dc *downstreamConn) {
624 dc.SendMessage(&irc.Message{
625 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
626 Command: "NICK",
627 Params: []string{newNick},
628 })
629 })
630 }
631 case "JOIN":
632 if msg.Prefix == nil {
633 return fmt.Errorf("expected a prefix")
634 }
635
636 var channels string
637 if err := parseMessageParams(msg, &channels); err != nil {
638 return err
639 }
640
641 for _, ch := range strings.Split(channels, ",") {
642 if msg.Prefix.Name == uc.nick {
643 uc.logger.Printf("joined channel %q", ch)
644 uc.channels[ch] = &upstreamChannel{
645 Name: ch,
646 conn: uc,
647 Members: make(map[string]*membership),
648 }
649
650 uc.SendMessage(&irc.Message{
651 Command: "MODE",
652 Params: []string{ch},
653 })
654 } else {
655 ch, err := uc.getChannel(ch)
656 if err != nil {
657 return err
658 }
659 ch.Members[msg.Prefix.Name] = nil
660 }
661
662 uc.forEachDownstream(func(dc *downstreamConn) {
663 dc.SendMessage(&irc.Message{
664 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
665 Command: "JOIN",
666 Params: []string{dc.marshalChannel(uc, ch)},
667 })
668 })
669 }
670 case "PART":
671 if msg.Prefix == nil {
672 return fmt.Errorf("expected a prefix")
673 }
674
675 var channels string
676 if err := parseMessageParams(msg, &channels); err != nil {
677 return err
678 }
679
680 for _, ch := range strings.Split(channels, ",") {
681 if msg.Prefix.Name == uc.nick {
682 uc.logger.Printf("parted channel %q", ch)
683 delete(uc.channels, ch)
684 } else {
685 ch, err := uc.getChannel(ch)
686 if err != nil {
687 return err
688 }
689 delete(ch.Members, msg.Prefix.Name)
690 }
691
692 uc.forEachDownstream(func(dc *downstreamConn) {
693 dc.SendMessage(&irc.Message{
694 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
695 Command: "PART",
696 Params: []string{dc.marshalChannel(uc, ch)},
697 })
698 })
699 }
700 case "KICK":
701 if msg.Prefix == nil {
702 return fmt.Errorf("expected a prefix")
703 }
704
705 var channel, user string
706 if err := parseMessageParams(msg, &channel, &user); err != nil {
707 return err
708 }
709
710 var reason string
711 if len(msg.Params) > 2 {
712 reason = msg.Params[1]
713 }
714
715 if user == uc.nick {
716 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
717 delete(uc.channels, channel)
718 } else {
719 ch, err := uc.getChannel(channel)
720 if err != nil {
721 return err
722 }
723 delete(ch.Members, user)
724 }
725
726 uc.forEachDownstream(func(dc *downstreamConn) {
727 params := []string{dc.marshalChannel(uc, channel), dc.marshalNick(uc, user)}
728 if reason != "" {
729 params = append(params, reason)
730 }
731 dc.SendMessage(&irc.Message{
732 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
733 Command: "KICK",
734 Params: params,
735 })
736 })
737 case "QUIT":
738 if msg.Prefix == nil {
739 return fmt.Errorf("expected a prefix")
740 }
741
742 if msg.Prefix.Name == uc.nick {
743 uc.logger.Printf("quit")
744 }
745
746 for _, ch := range uc.channels {
747 delete(ch.Members, msg.Prefix.Name)
748 }
749
750 if msg.Prefix.Name != uc.nick {
751 uc.forEachDownstream(func(dc *downstreamConn) {
752 dc.SendMessage(&irc.Message{
753 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
754 Command: "QUIT",
755 Params: msg.Params,
756 })
757 })
758 }
759 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
760 var name, topic string
761 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
762 return err
763 }
764 ch, err := uc.getChannel(name)
765 if err != nil {
766 return err
767 }
768 if msg.Command == irc.RPL_TOPIC {
769 ch.Topic = topic
770 } else {
771 ch.Topic = ""
772 }
773 case "TOPIC":
774 var name string
775 if err := parseMessageParams(msg, &name); err != nil {
776 return err
777 }
778 ch, err := uc.getChannel(name)
779 if err != nil {
780 return err
781 }
782 if len(msg.Params) > 1 {
783 ch.Topic = msg.Params[1]
784 } else {
785 ch.Topic = ""
786 }
787 uc.forEachDownstream(func(dc *downstreamConn) {
788 params := []string{dc.marshalChannel(uc, name)}
789 if ch.Topic != "" {
790 params = append(params, ch.Topic)
791 }
792 dc.SendMessage(&irc.Message{
793 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
794 Command: "TOPIC",
795 Params: params,
796 })
797 })
798 case "MODE":
799 var name, modeStr string
800 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
801 return err
802 }
803
804 if !uc.isChannel(name) { // user mode change
805 if name != uc.nick {
806 return fmt.Errorf("received MODE message for unknown nick %q", name)
807 }
808 return uc.modes.Apply(modeStr)
809 // TODO: notify downstreams about user mode change?
810 } else { // channel mode change
811 ch, err := uc.getChannel(name)
812 if err != nil {
813 return err
814 }
815
816 if ch.modes != nil {
817 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
818 return err
819 }
820 }
821
822 uc.forEachDownstream(func(dc *downstreamConn) {
823 params := []string{dc.marshalChannel(uc, name), modeStr}
824 params = append(params, msg.Params[2:]...)
825
826 dc.SendMessage(&irc.Message{
827 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
828 Command: "MODE",
829 Params: params,
830 })
831 })
832 }
833 case irc.RPL_UMODEIS:
834 if err := parseMessageParams(msg, nil); err != nil {
835 return err
836 }
837 modeStr := ""
838 if len(msg.Params) > 1 {
839 modeStr = msg.Params[1]
840 }
841
842 uc.modes = ""
843 if err := uc.modes.Apply(modeStr); err != nil {
844 return err
845 }
846 // TODO: send RPL_UMODEIS to downstream connections when applicable
847 case irc.RPL_CHANNELMODEIS:
848 var channel string
849 if err := parseMessageParams(msg, nil, &channel); err != nil {
850 return err
851 }
852 modeStr := ""
853 if len(msg.Params) > 2 {
854 modeStr = msg.Params[2]
855 }
856
857 ch, err := uc.getChannel(channel)
858 if err != nil {
859 return err
860 }
861
862 firstMode := ch.modes == nil
863 ch.modes = make(map[byte]string)
864 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
865 return err
866 }
867 if firstMode {
868 modeStr, modeParams := ch.modes.Format()
869
870 uc.forEachDownstream(func(dc *downstreamConn) {
871 params := []string{dc.nick, dc.marshalChannel(uc, channel), modeStr}
872 params = append(params, modeParams...)
873
874 dc.SendMessage(&irc.Message{
875 Prefix: dc.srv.prefix(),
876 Command: irc.RPL_CHANNELMODEIS,
877 Params: params,
878 })
879 })
880 }
881 case rpl_creationtime:
882 var channel, creationTime string
883 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
884 return err
885 }
886
887 ch, err := uc.getChannel(channel)
888 if err != nil {
889 return err
890 }
891
892 firstCreationTime := ch.creationTime == ""
893 ch.creationTime = creationTime
894 if firstCreationTime {
895 uc.forEachDownstream(func(dc *downstreamConn) {
896 dc.SendMessage(&irc.Message{
897 Prefix: dc.srv.prefix(),
898 Command: rpl_creationtime,
899 Params: []string{dc.nick, channel, creationTime},
900 })
901 })
902 }
903 case rpl_topicwhotime:
904 var name, who, timeStr string
905 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
906 return err
907 }
908 ch, err := uc.getChannel(name)
909 if err != nil {
910 return err
911 }
912 ch.TopicWho = who
913 sec, err := strconv.ParseInt(timeStr, 10, 64)
914 if err != nil {
915 return fmt.Errorf("failed to parse topic time: %v", err)
916 }
917 ch.TopicTime = time.Unix(sec, 0)
918 case irc.RPL_LIST:
919 var channel, clients, topic string
920 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
921 return err
922 }
923
924 pl := uc.getPendingList()
925 if pl == nil {
926 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
927 }
928
929 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
930 dc.SendMessage(&irc.Message{
931 Prefix: dc.srv.prefix(),
932 Command: irc.RPL_LIST,
933 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
934 })
935 })
936 case irc.RPL_LISTEND:
937 ok := uc.endPendingLists(false)
938 if !ok {
939 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
940 }
941 case irc.RPL_NAMREPLY:
942 var name, statusStr, members string
943 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
944 return err
945 }
946
947 ch, ok := uc.channels[name]
948 if !ok {
949 // NAMES on a channel we have not joined, forward to downstream
950 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
951 channel := dc.marshalChannel(uc, name)
952 members := splitSpace(members)
953 for i, member := range members {
954 membership, nick := uc.parseMembershipPrefix(member)
955 members[i] = membership.String() + dc.marshalNick(uc, nick)
956 }
957 memberStr := strings.Join(members, " ")
958
959 dc.SendMessage(&irc.Message{
960 Prefix: dc.srv.prefix(),
961 Command: irc.RPL_NAMREPLY,
962 Params: []string{dc.nick, statusStr, channel, memberStr},
963 })
964 })
965 return nil
966 }
967
968 status, err := parseChannelStatus(statusStr)
969 if err != nil {
970 return err
971 }
972 ch.Status = status
973
974 for _, s := range splitSpace(members) {
975 membership, nick := uc.parseMembershipPrefix(s)
976 ch.Members[nick] = membership
977 }
978 case irc.RPL_ENDOFNAMES:
979 var name string
980 if err := parseMessageParams(msg, nil, &name); err != nil {
981 return err
982 }
983
984 ch, ok := uc.channels[name]
985 if !ok {
986 // NAMES on a channel we have not joined, forward to downstream
987 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
988 channel := dc.marshalChannel(uc, name)
989
990 dc.SendMessage(&irc.Message{
991 Prefix: dc.srv.prefix(),
992 Command: irc.RPL_ENDOFNAMES,
993 Params: []string{dc.nick, channel, "End of /NAMES list"},
994 })
995 })
996 return nil
997 }
998
999 if ch.complete {
1000 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
1001 }
1002 ch.complete = true
1003
1004 uc.forEachDownstream(func(dc *downstreamConn) {
1005 forwardChannel(dc, ch)
1006 })
1007 case irc.RPL_WHOREPLY:
1008 var channel, username, host, server, nick, mode, trailing string
1009 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
1010 return err
1011 }
1012
1013 parts := strings.SplitN(trailing, " ", 2)
1014 if len(parts) != 2 {
1015 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
1016 }
1017 realname := parts[1]
1018 hops, err := strconv.Atoi(parts[0])
1019 if err != nil {
1020 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
1021 }
1022 hops++
1023
1024 trailing = strconv.Itoa(hops) + " " + realname
1025
1026 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1027 channel := channel
1028 if channel != "*" {
1029 channel = dc.marshalChannel(uc, channel)
1030 }
1031 nick := dc.marshalNick(uc, nick)
1032 dc.SendMessage(&irc.Message{
1033 Prefix: dc.srv.prefix(),
1034 Command: irc.RPL_WHOREPLY,
1035 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
1036 })
1037 })
1038 case irc.RPL_ENDOFWHO:
1039 var name string
1040 if err := parseMessageParams(msg, nil, &name); err != nil {
1041 return err
1042 }
1043
1044 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1045 name := name
1046 if name != "*" {
1047 // TODO: support WHO masks
1048 name = dc.marshalEntity(uc, name)
1049 }
1050 dc.SendMessage(&irc.Message{
1051 Prefix: dc.srv.prefix(),
1052 Command: irc.RPL_ENDOFWHO,
1053 Params: []string{dc.nick, name, "End of /WHO list"},
1054 })
1055 })
1056 case irc.RPL_WHOISUSER:
1057 var nick, username, host, realname string
1058 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
1059 return err
1060 }
1061
1062 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1063 nick := dc.marshalNick(uc, nick)
1064 dc.SendMessage(&irc.Message{
1065 Prefix: dc.srv.prefix(),
1066 Command: irc.RPL_WHOISUSER,
1067 Params: []string{dc.nick, nick, username, host, "*", realname},
1068 })
1069 })
1070 case irc.RPL_WHOISSERVER:
1071 var nick, server, serverInfo string
1072 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
1073 return err
1074 }
1075
1076 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1077 nick := dc.marshalNick(uc, nick)
1078 dc.SendMessage(&irc.Message{
1079 Prefix: dc.srv.prefix(),
1080 Command: irc.RPL_WHOISSERVER,
1081 Params: []string{dc.nick, nick, server, serverInfo},
1082 })
1083 })
1084 case irc.RPL_WHOISOPERATOR:
1085 var nick string
1086 if err := parseMessageParams(msg, nil, &nick); err != nil {
1087 return err
1088 }
1089
1090 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1091 nick := dc.marshalNick(uc, nick)
1092 dc.SendMessage(&irc.Message{
1093 Prefix: dc.srv.prefix(),
1094 Command: irc.RPL_WHOISOPERATOR,
1095 Params: []string{dc.nick, nick, "is an IRC operator"},
1096 })
1097 })
1098 case irc.RPL_WHOISIDLE:
1099 var nick string
1100 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1101 return err
1102 }
1103
1104 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1105 nick := dc.marshalNick(uc, nick)
1106 params := []string{dc.nick, nick}
1107 params = append(params, msg.Params[2:]...)
1108 dc.SendMessage(&irc.Message{
1109 Prefix: dc.srv.prefix(),
1110 Command: irc.RPL_WHOISIDLE,
1111 Params: params,
1112 })
1113 })
1114 case irc.RPL_WHOISCHANNELS:
1115 var nick, channelList string
1116 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1117 return err
1118 }
1119 channels := splitSpace(channelList)
1120
1121 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1122 nick := dc.marshalNick(uc, nick)
1123 channelList := make([]string, len(channels))
1124 for i, channel := range channels {
1125 prefix, channel := uc.parseMembershipPrefix(channel)
1126 channel = dc.marshalChannel(uc, channel)
1127 channelList[i] = prefix.String() + channel
1128 }
1129 channels := strings.Join(channelList, " ")
1130 dc.SendMessage(&irc.Message{
1131 Prefix: dc.srv.prefix(),
1132 Command: irc.RPL_WHOISCHANNELS,
1133 Params: []string{dc.nick, nick, channels},
1134 })
1135 })
1136 case irc.RPL_ENDOFWHOIS:
1137 var nick string
1138 if err := parseMessageParams(msg, nil, &nick); err != nil {
1139 return err
1140 }
1141
1142 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1143 nick := dc.marshalNick(uc, nick)
1144 dc.SendMessage(&irc.Message{
1145 Prefix: dc.srv.prefix(),
1146 Command: irc.RPL_ENDOFWHOIS,
1147 Params: []string{dc.nick, nick, "End of /WHOIS list"},
1148 })
1149 })
1150 case "PRIVMSG":
1151 if msg.Prefix == nil {
1152 return fmt.Errorf("expected a prefix")
1153 }
1154
1155 var nick string
1156 if err := parseMessageParams(msg, &nick, nil); err != nil {
1157 return err
1158 }
1159
1160 if msg.Prefix.Name == serviceNick {
1161 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1162 break
1163 }
1164 if nick == serviceNick {
1165 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1166 break
1167 }
1168
1169 uc.network.ring.Produce(msg)
1170 case "INVITE":
1171 var nick string
1172 var channel string
1173 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1174 return err
1175 }
1176
1177 uc.forEachDownstream(func(dc *downstreamConn) {
1178 dc.SendMessage(&irc.Message{
1179 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
1180 Command: "INVITE",
1181 Params: []string{dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1182 })
1183 })
1184 case irc.RPL_INVITING:
1185 var nick string
1186 var channel string
1187 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1188 return err
1189 }
1190
1191 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1192 dc.SendMessage(&irc.Message{
1193 Prefix: dc.srv.prefix(),
1194 Command: irc.RPL_INVITING,
1195 Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1196 })
1197 })
1198 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1199 var command, reason string
1200 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1201 return err
1202 }
1203
1204 if command == "LIST" {
1205 ok := uc.endPendingLists(false)
1206 if !ok {
1207 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1208 }
1209 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1210 dc.SendMessage(&irc.Message{
1211 Prefix: uc.srv.prefix(),
1212 Command: msg.Command,
1213 Params: []string{dc.nick, "LIST", reason},
1214 })
1215 })
1216 }
1217 case "TAGMSG":
1218 // TODO: relay to downstream connections that accept message-tags
1219 case "ACK":
1220 // Ignore
1221 case irc.RPL_YOURHOST, irc.RPL_CREATED:
1222 // Ignore
1223 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1224 // Ignore
1225 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1226 // Ignore
1227 case irc.RPL_LISTSTART:
1228 // Ignore
1229 case rpl_localusers, rpl_globalusers:
1230 // Ignore
1231 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
1232 // Ignore
1233 default:
1234 uc.logger.Printf("unhandled message: %v", msg)
1235 }
1236 return nil
1237}
1238
1239func splitSpace(s string) []string {
1240 return strings.FieldsFunc(s, func(r rune) bool {
1241 return r == ' '
1242 })
1243}
1244
1245func (uc *upstreamConn) register() {
1246 uc.nick = uc.network.Nick
1247 uc.username = uc.network.Username
1248 if uc.username == "" {
1249 uc.username = uc.nick
1250 }
1251 uc.realname = uc.network.Realname
1252 if uc.realname == "" {
1253 uc.realname = uc.nick
1254 }
1255
1256 uc.SendMessage(&irc.Message{
1257 Command: "CAP",
1258 Params: []string{"LS", "302"},
1259 })
1260
1261 if uc.network.Pass != "" {
1262 uc.SendMessage(&irc.Message{
1263 Command: "PASS",
1264 Params: []string{uc.network.Pass},
1265 })
1266 }
1267
1268 uc.SendMessage(&irc.Message{
1269 Command: "NICK",
1270 Params: []string{uc.nick},
1271 })
1272 uc.SendMessage(&irc.Message{
1273 Command: "USER",
1274 Params: []string{uc.username, "0", "*", uc.realname},
1275 })
1276}
1277
1278func (uc *upstreamConn) requestSASL() bool {
1279 if uc.network.SASL.Mechanism == "" {
1280 return false
1281 }
1282
1283 v, ok := uc.caps["sasl"]
1284 if !ok {
1285 return false
1286 }
1287 if v != "" {
1288 mechanisms := strings.Split(v, ",")
1289 found := false
1290 for _, mech := range mechanisms {
1291 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1292 found = true
1293 break
1294 }
1295 }
1296 if !found {
1297 return false
1298 }
1299 }
1300
1301 return true
1302}
1303
1304func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1305 auth := &uc.network.SASL
1306 switch name {
1307 case "sasl":
1308 if !ok {
1309 uc.logger.Printf("server refused to acknowledge the SASL capability")
1310 return nil
1311 }
1312
1313 switch auth.Mechanism {
1314 case "PLAIN":
1315 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1316 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1317 default:
1318 return fmt.Errorf("unsupported SASL mechanism %q", name)
1319 }
1320
1321 uc.SendMessage(&irc.Message{
1322 Command: "AUTHENTICATE",
1323 Params: []string{auth.Mechanism},
1324 })
1325 case "message-tags":
1326 uc.tagsSupported = ok
1327 case "labeled-response":
1328 uc.labelsSupported = ok
1329 }
1330 return nil
1331}
1332
1333func (uc *upstreamConn) readMessages(ch chan<- event) error {
1334 for {
1335 msg, err := uc.irc.ReadMessage()
1336 if err == io.EOF {
1337 break
1338 } else if err != nil {
1339 return fmt.Errorf("failed to read IRC command: %v", err)
1340 }
1341
1342 if uc.srv.Debug {
1343 uc.logger.Printf("received: %v", msg)
1344 }
1345
1346 ch <- eventUpstreamMessage{msg, uc}
1347 }
1348
1349 return nil
1350}
1351
1352func (uc *upstreamConn) SendMessage(msg *irc.Message) {
1353 uc.outgoing <- msg
1354}
1355
1356func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
1357 if uc.labelsSupported {
1358 if msg.Tags == nil {
1359 msg.Tags = make(map[string]irc.TagValue)
1360 }
1361 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
1362 uc.nextLabelID++
1363 }
1364 uc.SendMessage(msg)
1365}
Note: See TracBrowser for help on using the repository browser.