source: code/trunk/upstream.go@ 217

Last change on this file since 217 was 217, checked in by contact, 5 years ago

Add NOTICE messages to ring buffer

References: https://todo.sr.ht/~emersion/soju/33

File size: 34.0 KB
RevLine 
[98]1package soju
[13]2
3import (
4 "crypto/tls"
[95]5 "encoding/base64"
[155]6 "errors"
[13]7 "fmt"
8 "io"
9 "net"
[178]10 "os"
[19]11 "strconv"
[17]12 "strings"
[19]13 "time"
[13]14
[95]15 "github.com/emersion/go-sasl"
[13]16 "gopkg.in/irc.v3"
17)
18
[19]19type upstreamChannel struct {
[162]20 Name string
21 conn *upstreamConn
22 Topic string
23 TopicWho string
24 TopicTime time.Time
25 Status channelStatus
26 modes channelModes
27 creationTime string
28 Members map[string]*membership
29 complete bool
[19]30}
31
[13]32type upstreamConn struct {
[210]33 conn
[16]34
[210]35 network *network
36 user *user
37
[16]38 serverName string
39 availableUserModes string
[139]40 availableChannelModes map[byte]channelModeType
41 availableChannelTypes string
42 availableMemberships []membership
[19]43
44 registered bool
[42]45 nick string
[77]46 username string
47 realname string
[139]48 modes userModes
[19]49 channels map[string]*upstreamChannel
[92]50 caps map[string]string
[153]51 batches map[string]batch
[198]52 away bool
[95]53
[155]54 tagsSupported bool
55 labelsSupported bool
[161]56 nextLabelID uint64
[152]57
[95]58 saslClient sasl.Client
59 saslStarted bool
[177]60
61 // set of LIST commands in progress, per downstream
62 pendingLISTDownstreamSet map[uint64]struct{}
[178]63
[215]64 messageLoggers map[string]*messageLogger
[13]65}
66
[178]67type entityLog struct {
68 name string
69 file *os.File
70}
71
[77]72func connectToUpstream(network *network) (*upstreamConn, error) {
73 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
[33]74
[77]75 addr := network.Addr
76 if !strings.ContainsRune(addr, ':') {
77 addr = addr + ":6697"
78 }
79
[206]80 dialer := net.Dialer{Timeout: connectTimeout}
81
[77]82 logger.Printf("connecting to TLS server at address %q", addr)
[206]83 netConn, err := tls.DialWithDialer(&dialer, "tcp", addr, nil)
[33]84 if err != nil {
[77]85 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
[33]86 }
87
[55]88 uc := &upstreamConn{
[210]89 conn: *newConn(network.user.srv, netConn, logger),
[177]90 network: network,
91 user: network.user,
92 channels: make(map[string]*upstreamChannel),
93 caps: make(map[string]string),
94 batches: make(map[string]batch),
95 availableChannelTypes: stdChannelTypes,
96 availableChannelModes: stdChannelModes,
97 availableMemberships: stdMemberships,
98 pendingLISTDownstreamSet: make(map[uint64]struct{}),
[215]99 messageLoggers: make(map[string]*messageLogger),
[33]100 }
101
[55]102 return uc, nil
[33]103}
104
[73]105func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
106 uc.user.forEachDownstream(func(dc *downstreamConn) {
[77]107 if dc.network != nil && dc.network != uc.network {
[73]108 return
109 }
110 f(dc)
111 })
112}
113
[161]114func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
[155]115 uc.forEachDownstream(func(dc *downstreamConn) {
116 if id != 0 && id != dc.id {
117 return
118 }
119 f(dc)
120 })
121}
122
[55]123func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
124 ch, ok := uc.channels[name]
[19]125 if !ok {
126 return nil, fmt.Errorf("unknown channel %q", name)
127 }
128 return ch, nil
129}
130
[129]131func (uc *upstreamConn) isChannel(entity string) bool {
[139]132 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
133 return true
[129]134 }
135 return false
136}
137
[181]138func (uc *upstreamConn) getPendingLIST() *pendingLIST {
[177]139 for _, pl := range uc.user.pendingLISTs {
140 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
141 continue
142 }
143 return &pl
144 }
145 return nil
146}
147
[181]148func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
[177]149 found = false
150 for i := 0; i < len(uc.user.pendingLISTs); i++ {
151 pl := uc.user.pendingLISTs[i]
152 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
153 continue
154 }
155 delete(pl.pendingCommands, uc.network.ID)
156 if len(pl.pendingCommands) == 0 {
157 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
158 i--
159 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
160 dc.SendMessage(&irc.Message{
161 Prefix: dc.srv.prefix(),
162 Command: irc.RPL_LISTEND,
163 Params: []string{dc.nick, "End of /LIST"},
164 })
165 })
166 }
167 found = true
168 if !all {
169 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
170 uc.user.forEachUpstream(func(uc *upstreamConn) {
[181]171 uc.trySendLIST(pl.downstreamID)
[177]172 })
173 return
174 }
175 }
176 return
177}
178
[181]179func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
[177]180 // must be called with a lock in uc.user.pendingLISTsLock
181
182 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
183 // a LIST command is already pending
184 // we will try again when that command is completed
185 return
186 }
187
188 for _, pl := range uc.user.pendingLISTs {
189 if pl.downstreamID != downstreamID {
190 continue
191 }
192 // this is the first pending LIST command list of the downstream
193 listCommand, ok := pl.pendingCommands[uc.network.ID]
194 if !ok {
195 // there is no command for this upstream in these LIST commands
196 // do not send anything
197 continue
198 }
199 // there is a command for this upstream in these LIST commands
200 // send it now
201
202 uc.SendMessageLabeled(downstreamID, listCommand)
203
204 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
205 return
206 }
207}
208
[139]209func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
210 for _, m := range uc.availableMemberships {
211 if m.Prefix == s[0] {
212 return &m, s[1:]
213 }
214 }
215 return nil, s
216}
217
[55]218func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[155]219 var label string
220 if l, ok := msg.GetTag("label"); ok {
221 label = l
222 }
223
[153]224 var msgBatch *batch
225 if batchName, ok := msg.GetTag("batch"); ok {
226 b, ok := uc.batches[batchName]
227 if !ok {
228 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
229 }
230 msgBatch = &b
[155]231 if label == "" {
232 label = msgBatch.Label
233 }
[153]234 }
235
[161]236 var downstreamID uint64 = 0
[155]237 if label != "" {
238 var labelOffset uint64
[161]239 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
[155]240 if err == nil && n < 2 {
241 err = errors.New("not enough arguments")
242 }
243 if err != nil {
244 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
245 }
246 }
247
[216]248 if _, ok := msg.Tags["time"]; !ok {
249 msg.Tags["time"] = irc.TagValue(time.Now().Format(serverTimeLayout))
250 }
251
[13]252 switch msg.Command {
253 case "PING":
[60]254 uc.SendMessage(&irc.Message{
[13]255 Command: "PONG",
[68]256 Params: msg.Params,
[60]257 })
[33]258 return nil
[18]259 case "NOTICE":
[171]260 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
[217]261 uc.network.ring.Produce(msg)
[171]262 } else { // regular user NOTICE
[217]263 var entity, text string
264 if err := parseMessageParams(msg, &entity, &text); err != nil {
[171]265 return err
266 }
267
[217]268 target := entity
269 if target == uc.nick {
[178]270 target = msg.Prefix.Name
271 }
[215]272 uc.appendLog(target, msg)
[178]273
[217]274 uc.network.ring.Produce(msg)
[171]275 }
[92]276 case "CAP":
[95]277 var subCmd string
278 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
279 return err
[92]280 }
[95]281 subCmd = strings.ToUpper(subCmd)
282 subParams := msg.Params[2:]
283 switch subCmd {
284 case "LS":
285 if len(subParams) < 1 {
286 return newNeedMoreParamsError(msg.Command)
287 }
288 caps := strings.Fields(subParams[len(subParams)-1])
289 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
[92]290
[95]291 for _, s := range caps {
292 kv := strings.SplitN(s, "=", 2)
293 k := strings.ToLower(kv[0])
294 var v string
295 if len(kv) == 2 {
296 v = kv[1]
297 }
298 uc.caps[k] = v
[92]299 }
300
[95]301 if more {
302 break // wait to receive all capabilities
303 }
304
[152]305 requestCaps := make([]string, 0, 16)
[193]306 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time"} {
[152]307 if _, ok := uc.caps[c]; ok {
308 requestCaps = append(requestCaps, c)
309 }
310 }
311
[95]312 if uc.requestSASL() {
[152]313 requestCaps = append(requestCaps, "sasl")
314 }
315
316 if len(requestCaps) > 0 {
[95]317 uc.SendMessage(&irc.Message{
318 Command: "CAP",
[152]319 Params: []string{"REQ", strings.Join(requestCaps, " ")},
[95]320 })
[152]321 }
322
323 if uc.requestSASL() {
[95]324 break // we'll send CAP END after authentication is completed
325 }
326
[92]327 uc.SendMessage(&irc.Message{
328 Command: "CAP",
329 Params: []string{"END"},
330 })
[95]331 case "ACK", "NAK":
332 if len(subParams) < 1 {
333 return newNeedMoreParamsError(msg.Command)
334 }
335 caps := strings.Fields(subParams[0])
336
337 for _, name := range caps {
338 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
339 return err
340 }
341 }
342
343 if uc.saslClient == nil {
344 uc.SendMessage(&irc.Message{
345 Command: "CAP",
346 Params: []string{"END"},
347 })
348 }
349 default:
350 uc.logger.Printf("unhandled message: %v", msg)
[92]351 }
[95]352 case "AUTHENTICATE":
353 if uc.saslClient == nil {
354 return fmt.Errorf("received unexpected AUTHENTICATE message")
355 }
356
357 // TODO: if a challenge is 400 bytes long, buffer it
358 var challengeStr string
359 if err := parseMessageParams(msg, &challengeStr); err != nil {
360 uc.SendMessage(&irc.Message{
361 Command: "AUTHENTICATE",
362 Params: []string{"*"},
363 })
364 return err
365 }
366
367 var challenge []byte
368 if challengeStr != "+" {
369 var err error
370 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
371 if err != nil {
372 uc.SendMessage(&irc.Message{
373 Command: "AUTHENTICATE",
374 Params: []string{"*"},
375 })
376 return err
377 }
378 }
379
380 var resp []byte
381 var err error
382 if !uc.saslStarted {
383 _, resp, err = uc.saslClient.Start()
384 uc.saslStarted = true
385 } else {
386 resp, err = uc.saslClient.Next(challenge)
387 }
388 if err != nil {
389 uc.SendMessage(&irc.Message{
390 Command: "AUTHENTICATE",
391 Params: []string{"*"},
392 })
393 return err
394 }
395
396 // TODO: send response in multiple chunks if >= 400 bytes
397 var respStr = "+"
398 if resp != nil {
399 respStr = base64.StdEncoding.EncodeToString(resp)
400 }
401
402 uc.SendMessage(&irc.Message{
403 Command: "AUTHENTICATE",
404 Params: []string{respStr},
405 })
[125]406 case irc.RPL_LOGGEDIN:
[95]407 var account string
408 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
409 return err
410 }
411 uc.logger.Printf("logged in with account %q", account)
[125]412 case irc.RPL_LOGGEDOUT:
[95]413 uc.logger.Printf("logged out")
[125]414 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
[95]415 var info string
416 if err := parseMessageParams(msg, nil, &info); err != nil {
417 return err
418 }
419 switch msg.Command {
[125]420 case irc.ERR_NICKLOCKED:
[95]421 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
[125]422 case irc.ERR_SASLFAIL:
[95]423 uc.logger.Printf("SASL authentication failed: %v", info)
[125]424 case irc.ERR_SASLTOOLONG:
[95]425 uc.logger.Printf("SASL message too long: %v", info)
426 }
427
428 uc.saslClient = nil
429 uc.saslStarted = false
430
431 uc.SendMessage(&irc.Message{
432 Command: "CAP",
433 Params: []string{"END"},
434 })
[14]435 case irc.RPL_WELCOME:
[55]436 uc.registered = true
437 uc.logger.Printf("connection registered")
[19]438
[77]439 channels, err := uc.srv.db.ListChannels(uc.network.ID)
440 if err != nil {
441 uc.logger.Printf("failed to list channels from database: %v", err)
442 break
443 }
444
445 for _, ch := range channels {
[146]446 params := []string{ch.Name}
447 if ch.Key != "" {
448 params = append(params, ch.Key)
449 }
[60]450 uc.SendMessage(&irc.Message{
[19]451 Command: "JOIN",
[146]452 Params: params,
[60]453 })
[19]454 }
[16]455 case irc.RPL_MYINFO:
[139]456 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
[43]457 return err
[16]458 }
[139]459 case irc.RPL_ISUPPORT:
460 if err := parseMessageParams(msg, nil, nil); err != nil {
461 return err
[16]462 }
[139]463 for _, token := range msg.Params[1 : len(msg.Params)-1] {
464 negate := false
465 parameter := token
466 value := ""
467 if strings.HasPrefix(token, "-") {
468 negate = true
469 token = token[1:]
470 } else {
471 if i := strings.IndexByte(token, '='); i >= 0 {
472 parameter = token[:i]
473 value = token[i+1:]
474 }
475 }
476 if !negate {
477 switch parameter {
478 case "CHANMODES":
479 parts := strings.SplitN(value, ",", 5)
480 if len(parts) < 4 {
481 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
482 }
483 modes := make(map[byte]channelModeType)
484 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
485 for j := 0; j < len(parts[i]); j++ {
486 mode := parts[i][j]
487 modes[mode] = mt
488 }
489 }
490 uc.availableChannelModes = modes
491 case "CHANTYPES":
492 uc.availableChannelTypes = value
493 case "PREFIX":
494 if value == "" {
495 uc.availableMemberships = nil
496 } else {
497 if value[0] != '(' {
498 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
499 }
500 sep := strings.IndexByte(value, ')')
501 if sep < 0 || len(value) != sep*2 {
502 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
503 }
504 memberships := make([]membership, len(value)/2-1)
505 for i := range memberships {
506 memberships[i] = membership{
507 Mode: value[i+1],
508 Prefix: value[sep+i+1],
509 }
510 }
511 uc.availableMemberships = memberships
512 }
513 }
514 } else {
515 // TODO: handle ISUPPORT negations
516 }
517 }
[153]518 case "BATCH":
519 var tag string
520 if err := parseMessageParams(msg, &tag); err != nil {
521 return err
522 }
523
524 if strings.HasPrefix(tag, "+") {
525 tag = tag[1:]
526 if _, ok := uc.batches[tag]; ok {
527 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
528 }
529 var batchType string
530 if err := parseMessageParams(msg, nil, &batchType); err != nil {
531 return err
532 }
[155]533 label := label
534 if label == "" && msgBatch != nil {
535 label = msgBatch.Label
536 }
[153]537 uc.batches[tag] = batch{
538 Type: batchType,
539 Params: msg.Params[2:],
540 Outer: msgBatch,
[155]541 Label: label,
[153]542 }
543 } else if strings.HasPrefix(tag, "-") {
544 tag = tag[1:]
545 if _, ok := uc.batches[tag]; !ok {
546 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
547 }
548 delete(uc.batches, tag)
549 } else {
550 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
551 }
[42]552 case "NICK":
[83]553 if msg.Prefix == nil {
554 return fmt.Errorf("expected a prefix")
555 }
556
[43]557 var newNick string
558 if err := parseMessageParams(msg, &newNick); err != nil {
559 return err
[42]560 }
561
[55]562 if msg.Prefix.Name == uc.nick {
563 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
564 uc.nick = newNick
[42]565 }
566
[55]567 for _, ch := range uc.channels {
[42]568 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
569 delete(ch.Members, msg.Prefix.Name)
570 ch.Members[newNick] = membership
[215]571 uc.appendLog(ch.Name, msg)
[42]572 }
573 }
[82]574
575 if msg.Prefix.Name != uc.nick {
576 uc.forEachDownstream(func(dc *downstreamConn) {
577 dc.SendMessage(&irc.Message{
578 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
579 Command: "NICK",
580 Params: []string{newNick},
581 })
582 })
583 }
[69]584 case "JOIN":
585 if msg.Prefix == nil {
586 return fmt.Errorf("expected a prefix")
587 }
[42]588
[43]589 var channels string
590 if err := parseMessageParams(msg, &channels); err != nil {
591 return err
[19]592 }
[34]593
[43]594 for _, ch := range strings.Split(channels, ",") {
[55]595 if msg.Prefix.Name == uc.nick {
596 uc.logger.Printf("joined channel %q", ch)
597 uc.channels[ch] = &upstreamChannel{
[34]598 Name: ch,
[55]599 conn: uc,
[139]600 Members: make(map[string]*membership),
[34]601 }
[139]602
603 uc.SendMessage(&irc.Message{
604 Command: "MODE",
605 Params: []string{ch},
606 })
[34]607 } else {
[55]608 ch, err := uc.getChannel(ch)
[34]609 if err != nil {
610 return err
611 }
[139]612 ch.Members[msg.Prefix.Name] = nil
[19]613 }
[69]614
[215]615 uc.appendLog(ch, msg)
[178]616
[73]617 uc.forEachDownstream(func(dc *downstreamConn) {
[69]618 dc.SendMessage(&irc.Message{
619 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
620 Command: "JOIN",
621 Params: []string{dc.marshalChannel(uc, ch)},
622 })
623 })
[19]624 }
[69]625 case "PART":
626 if msg.Prefix == nil {
627 return fmt.Errorf("expected a prefix")
628 }
[34]629
[43]630 var channels string
631 if err := parseMessageParams(msg, &channels); err != nil {
632 return err
[34]633 }
634
[178]635 var reason string
636 if len(msg.Params) > 1 {
637 reason = msg.Params[1]
638 }
639
[43]640 for _, ch := range strings.Split(channels, ",") {
[55]641 if msg.Prefix.Name == uc.nick {
642 uc.logger.Printf("parted channel %q", ch)
643 delete(uc.channels, ch)
[34]644 } else {
[55]645 ch, err := uc.getChannel(ch)
[34]646 if err != nil {
647 return err
648 }
649 delete(ch.Members, msg.Prefix.Name)
650 }
[69]651
[215]652 uc.appendLog(ch, msg)
[178]653
[73]654 uc.forEachDownstream(func(dc *downstreamConn) {
[215]655 params := []string{dc.marshalChannel(uc, ch)}
656 if reason != "" {
657 params = append(params, reason)
658 }
[69]659 dc.SendMessage(&irc.Message{
660 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
661 Command: "PART",
[215]662 Params: params,
[69]663 })
664 })
[34]665 }
[159]666 case "KICK":
667 if msg.Prefix == nil {
668 return fmt.Errorf("expected a prefix")
669 }
670
671 var channel, user string
672 if err := parseMessageParams(msg, &channel, &user); err != nil {
673 return err
674 }
675
676 var reason string
677 if len(msg.Params) > 2 {
[215]678 reason = msg.Params[2]
[159]679 }
680
681 if user == uc.nick {
682 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
683 delete(uc.channels, channel)
684 } else {
685 ch, err := uc.getChannel(channel)
686 if err != nil {
687 return err
688 }
689 delete(ch.Members, user)
690 }
691
[215]692 uc.appendLog(channel, msg)
[178]693
[159]694 uc.forEachDownstream(func(dc *downstreamConn) {
695 params := []string{dc.marshalChannel(uc, channel), dc.marshalNick(uc, user)}
696 if reason != "" {
697 params = append(params, reason)
698 }
699 dc.SendMessage(&irc.Message{
700 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
701 Command: "KICK",
702 Params: params,
703 })
704 })
[83]705 case "QUIT":
706 if msg.Prefix == nil {
707 return fmt.Errorf("expected a prefix")
708 }
709
710 if msg.Prefix.Name == uc.nick {
711 uc.logger.Printf("quit")
712 }
713
714 for _, ch := range uc.channels {
[178]715 if _, ok := ch.Members[msg.Prefix.Name]; ok {
716 delete(ch.Members, msg.Prefix.Name)
717
[215]718 uc.appendLog(ch.Name, msg)
[178]719 }
[83]720 }
721
722 if msg.Prefix.Name != uc.nick {
723 uc.forEachDownstream(func(dc *downstreamConn) {
724 dc.SendMessage(&irc.Message{
725 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
726 Command: "QUIT",
727 Params: msg.Params,
728 })
729 })
730 }
[19]731 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]732 var name, topic string
733 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
734 return err
[19]735 }
[55]736 ch, err := uc.getChannel(name)
[19]737 if err != nil {
738 return err
739 }
740 if msg.Command == irc.RPL_TOPIC {
[43]741 ch.Topic = topic
[19]742 } else {
743 ch.Topic = ""
744 }
745 case "TOPIC":
[43]746 var name string
[74]747 if err := parseMessageParams(msg, &name); err != nil {
[43]748 return err
[19]749 }
[55]750 ch, err := uc.getChannel(name)
[19]751 if err != nil {
752 return err
753 }
754 if len(msg.Params) > 1 {
755 ch.Topic = msg.Params[1]
756 } else {
757 ch.Topic = ""
758 }
[74]759 uc.forEachDownstream(func(dc *downstreamConn) {
760 params := []string{dc.marshalChannel(uc, name)}
761 if ch.Topic != "" {
762 params = append(params, ch.Topic)
763 }
764 dc.SendMessage(&irc.Message{
765 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
766 Command: "TOPIC",
767 Params: params,
768 })
769 })
[139]770 case "MODE":
771 var name, modeStr string
772 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
773 return err
774 }
775
776 if !uc.isChannel(name) { // user mode change
777 if name != uc.nick {
778 return fmt.Errorf("received MODE message for unknown nick %q", name)
779 }
780 return uc.modes.Apply(modeStr)
781 // TODO: notify downstreams about user mode change?
782 } else { // channel mode change
783 ch, err := uc.getChannel(name)
784 if err != nil {
785 return err
786 }
787
788 if ch.modes != nil {
789 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
790 return err
791 }
792 }
793
[215]794 uc.appendLog(ch.Name, msg)
[178]795
[139]796 uc.forEachDownstream(func(dc *downstreamConn) {
797 params := []string{dc.marshalChannel(uc, name), modeStr}
798 params = append(params, msg.Params[2:]...)
799
800 dc.SendMessage(&irc.Message{
801 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
802 Command: "MODE",
803 Params: params,
804 })
805 })
806 }
807 case irc.RPL_UMODEIS:
808 if err := parseMessageParams(msg, nil); err != nil {
809 return err
810 }
811 modeStr := ""
812 if len(msg.Params) > 1 {
813 modeStr = msg.Params[1]
814 }
815
816 uc.modes = ""
817 if err := uc.modes.Apply(modeStr); err != nil {
818 return err
819 }
820 // TODO: send RPL_UMODEIS to downstream connections when applicable
821 case irc.RPL_CHANNELMODEIS:
822 var channel string
823 if err := parseMessageParams(msg, nil, &channel); err != nil {
824 return err
825 }
826 modeStr := ""
827 if len(msg.Params) > 2 {
828 modeStr = msg.Params[2]
829 }
830
831 ch, err := uc.getChannel(channel)
832 if err != nil {
833 return err
834 }
835
836 firstMode := ch.modes == nil
837 ch.modes = make(map[byte]string)
838 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
839 return err
840 }
841 if firstMode {
842 modeStr, modeParams := ch.modes.Format()
843
844 uc.forEachDownstream(func(dc *downstreamConn) {
845 params := []string{dc.nick, dc.marshalChannel(uc, channel), modeStr}
846 params = append(params, modeParams...)
847
848 dc.SendMessage(&irc.Message{
849 Prefix: dc.srv.prefix(),
850 Command: irc.RPL_CHANNELMODEIS,
851 Params: params,
852 })
853 })
854 }
[162]855 case rpl_creationtime:
856 var channel, creationTime string
857 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
858 return err
859 }
860
861 ch, err := uc.getChannel(channel)
862 if err != nil {
863 return err
864 }
865
866 firstCreationTime := ch.creationTime == ""
867 ch.creationTime = creationTime
868 if firstCreationTime {
869 uc.forEachDownstream(func(dc *downstreamConn) {
870 dc.SendMessage(&irc.Message{
871 Prefix: dc.srv.prefix(),
872 Command: rpl_creationtime,
873 Params: []string{dc.nick, channel, creationTime},
874 })
875 })
876 }
[19]877 case rpl_topicwhotime:
[43]878 var name, who, timeStr string
879 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
880 return err
[19]881 }
[55]882 ch, err := uc.getChannel(name)
[19]883 if err != nil {
884 return err
885 }
[43]886 ch.TopicWho = who
887 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]888 if err != nil {
889 return fmt.Errorf("failed to parse topic time: %v", err)
890 }
891 ch.TopicTime = time.Unix(sec, 0)
[177]892 case irc.RPL_LIST:
893 var channel, clients, topic string
894 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
895 return err
896 }
897
[181]898 pl := uc.getPendingLIST()
[177]899 if pl == nil {
900 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
901 }
902
903 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
904 dc.SendMessage(&irc.Message{
905 Prefix: dc.srv.prefix(),
906 Command: irc.RPL_LIST,
907 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
908 })
909 })
910 case irc.RPL_LISTEND:
[181]911 ok := uc.endPendingLISTs(false)
[177]912 if !ok {
913 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
914 }
[19]915 case irc.RPL_NAMREPLY:
[43]916 var name, statusStr, members string
917 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
918 return err
[19]919 }
[140]920
921 ch, ok := uc.channels[name]
922 if !ok {
923 // NAMES on a channel we have not joined, forward to downstream
[161]924 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[140]925 channel := dc.marshalChannel(uc, name)
[174]926 members := splitSpace(members)
[140]927 for i, member := range members {
928 membership, nick := uc.parseMembershipPrefix(member)
929 members[i] = membership.String() + dc.marshalNick(uc, nick)
930 }
931 memberStr := strings.Join(members, " ")
932
933 dc.SendMessage(&irc.Message{
934 Prefix: dc.srv.prefix(),
935 Command: irc.RPL_NAMREPLY,
936 Params: []string{dc.nick, statusStr, channel, memberStr},
937 })
938 })
939 return nil
[19]940 }
941
[43]942 status, err := parseChannelStatus(statusStr)
[19]943 if err != nil {
944 return err
945 }
946 ch.Status = status
947
[174]948 for _, s := range splitSpace(members) {
[139]949 membership, nick := uc.parseMembershipPrefix(s)
[19]950 ch.Members[nick] = membership
951 }
952 case irc.RPL_ENDOFNAMES:
[43]953 var name string
954 if err := parseMessageParams(msg, nil, &name); err != nil {
955 return err
[25]956 }
[140]957
958 ch, ok := uc.channels[name]
959 if !ok {
960 // NAMES on a channel we have not joined, forward to downstream
[161]961 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[140]962 channel := dc.marshalChannel(uc, name)
963
964 dc.SendMessage(&irc.Message{
965 Prefix: dc.srv.prefix(),
966 Command: irc.RPL_ENDOFNAMES,
967 Params: []string{dc.nick, channel, "End of /NAMES list"},
968 })
969 })
970 return nil
[25]971 }
972
[34]973 if ch.complete {
974 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
975 }
[25]976 ch.complete = true
[27]977
[73]978 uc.forEachDownstream(func(dc *downstreamConn) {
[27]979 forwardChannel(dc, ch)
[40]980 })
[127]981 case irc.RPL_WHOREPLY:
982 var channel, username, host, server, nick, mode, trailing string
983 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
984 return err
985 }
986
987 parts := strings.SplitN(trailing, " ", 2)
988 if len(parts) != 2 {
989 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
990 }
991 realname := parts[1]
992 hops, err := strconv.Atoi(parts[0])
993 if err != nil {
994 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
995 }
996 hops++
997
998 trailing = strconv.Itoa(hops) + " " + realname
999
[161]1000 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[127]1001 channel := channel
1002 if channel != "*" {
1003 channel = dc.marshalChannel(uc, channel)
1004 }
1005 nick := dc.marshalNick(uc, nick)
1006 dc.SendMessage(&irc.Message{
1007 Prefix: dc.srv.prefix(),
1008 Command: irc.RPL_WHOREPLY,
1009 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
1010 })
1011 })
1012 case irc.RPL_ENDOFWHO:
1013 var name string
1014 if err := parseMessageParams(msg, nil, &name); err != nil {
1015 return err
1016 }
1017
[161]1018 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[127]1019 name := name
1020 if name != "*" {
1021 // TODO: support WHO masks
1022 name = dc.marshalEntity(uc, name)
1023 }
1024 dc.SendMessage(&irc.Message{
1025 Prefix: dc.srv.prefix(),
1026 Command: irc.RPL_ENDOFWHO,
[142]1027 Params: []string{dc.nick, name, "End of /WHO list"},
[127]1028 })
1029 })
[128]1030 case irc.RPL_WHOISUSER:
1031 var nick, username, host, realname string
1032 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
1033 return err
1034 }
1035
[161]1036 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1037 nick := dc.marshalNick(uc, nick)
1038 dc.SendMessage(&irc.Message{
1039 Prefix: dc.srv.prefix(),
1040 Command: irc.RPL_WHOISUSER,
1041 Params: []string{dc.nick, nick, username, host, "*", realname},
1042 })
1043 })
1044 case irc.RPL_WHOISSERVER:
1045 var nick, server, serverInfo string
1046 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
1047 return err
1048 }
1049
[161]1050 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1051 nick := dc.marshalNick(uc, nick)
1052 dc.SendMessage(&irc.Message{
1053 Prefix: dc.srv.prefix(),
1054 Command: irc.RPL_WHOISSERVER,
1055 Params: []string{dc.nick, nick, server, serverInfo},
1056 })
1057 })
1058 case irc.RPL_WHOISOPERATOR:
1059 var nick string
1060 if err := parseMessageParams(msg, nil, &nick); err != nil {
1061 return err
1062 }
1063
[161]1064 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1065 nick := dc.marshalNick(uc, nick)
1066 dc.SendMessage(&irc.Message{
1067 Prefix: dc.srv.prefix(),
1068 Command: irc.RPL_WHOISOPERATOR,
1069 Params: []string{dc.nick, nick, "is an IRC operator"},
1070 })
1071 })
1072 case irc.RPL_WHOISIDLE:
1073 var nick string
1074 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1075 return err
1076 }
1077
[161]1078 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1079 nick := dc.marshalNick(uc, nick)
1080 params := []string{dc.nick, nick}
1081 params = append(params, msg.Params[2:]...)
1082 dc.SendMessage(&irc.Message{
1083 Prefix: dc.srv.prefix(),
1084 Command: irc.RPL_WHOISIDLE,
1085 Params: params,
1086 })
1087 })
1088 case irc.RPL_WHOISCHANNELS:
1089 var nick, channelList string
1090 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1091 return err
1092 }
[174]1093 channels := splitSpace(channelList)
[128]1094
[161]1095 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1096 nick := dc.marshalNick(uc, nick)
1097 channelList := make([]string, len(channels))
1098 for i, channel := range channels {
[139]1099 prefix, channel := uc.parseMembershipPrefix(channel)
[128]1100 channel = dc.marshalChannel(uc, channel)
1101 channelList[i] = prefix.String() + channel
1102 }
1103 channels := strings.Join(channelList, " ")
1104 dc.SendMessage(&irc.Message{
1105 Prefix: dc.srv.prefix(),
1106 Command: irc.RPL_WHOISCHANNELS,
1107 Params: []string{dc.nick, nick, channels},
1108 })
1109 })
1110 case irc.RPL_ENDOFWHOIS:
1111 var nick string
1112 if err := parseMessageParams(msg, nil, &nick); err != nil {
1113 return err
1114 }
1115
[161]1116 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1117 nick := dc.marshalNick(uc, nick)
1118 dc.SendMessage(&irc.Message{
1119 Prefix: dc.srv.prefix(),
1120 Command: irc.RPL_ENDOFWHOIS,
[142]1121 Params: []string{dc.nick, nick, "End of /WHOIS list"},
[128]1122 })
1123 })
[36]1124 case "PRIVMSG":
[117]1125 if msg.Prefix == nil {
1126 return fmt.Errorf("expected a prefix")
1127 }
1128
[217]1129 var entity, text string
1130 if err := parseMessageParams(msg, &entity, &text); err != nil {
[69]1131 return err
1132 }
[117]1133
1134 if msg.Prefix.Name == serviceNick {
1135 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1136 break
1137 }
[217]1138 if entity == serviceNick {
[117]1139 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1140 break
1141 }
1142
[217]1143 target := entity
1144 if target == uc.nick {
[178]1145 target = msg.Prefix.Name
1146 }
[215]1147 uc.appendLog(target, msg)
[178]1148
[143]1149 uc.network.ring.Produce(msg)
[115]1150 case "INVITE":
1151 var nick string
1152 var channel string
1153 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1154 return err
1155 }
1156
1157 uc.forEachDownstream(func(dc *downstreamConn) {
1158 dc.SendMessage(&irc.Message{
1159 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
1160 Command: "INVITE",
1161 Params: []string{dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1162 })
1163 })
[163]1164 case irc.RPL_INVITING:
1165 var nick string
1166 var channel string
1167 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1168 return err
1169 }
1170
1171 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1172 dc.SendMessage(&irc.Message{
1173 Prefix: dc.srv.prefix(),
1174 Command: irc.RPL_INVITING,
1175 Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1176 })
1177 })
[177]1178 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1179 var command, reason string
1180 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1181 return err
1182 }
1183
1184 if command == "LIST" {
[181]1185 ok := uc.endPendingLISTs(false)
[177]1186 if !ok {
1187 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1188 }
1189 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1190 dc.SendMessage(&irc.Message{
1191 Prefix: uc.srv.prefix(),
1192 Command: msg.Command,
1193 Params: []string{dc.nick, "LIST", reason},
1194 })
1195 })
1196 }
[152]1197 case "TAGMSG":
1198 // TODO: relay to downstream connections that accept message-tags
[155]1199 case "ACK":
1200 // Ignore
[198]1201 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1202 // Ignore
[16]1203 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]1204 // Ignore
1205 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1206 // Ignore
1207 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1208 // Ignore
[177]1209 case irc.RPL_LISTSTART:
1210 // Ignore
[14]1211 case rpl_localusers, rpl_globalusers:
1212 // Ignore
[96]1213 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
[14]1214 // Ignore
[13]1215 default:
[95]1216 uc.logger.Printf("unhandled message: %v", msg)
[13]1217 }
[14]1218 return nil
[13]1219}
1220
[174]1221func splitSpace(s string) []string {
1222 return strings.FieldsFunc(s, func(r rune) bool {
1223 return r == ' '
1224 })
1225}
1226
[55]1227func (uc *upstreamConn) register() {
[77]1228 uc.nick = uc.network.Nick
1229 uc.username = uc.network.Username
1230 if uc.username == "" {
1231 uc.username = uc.nick
1232 }
1233 uc.realname = uc.network.Realname
1234 if uc.realname == "" {
1235 uc.realname = uc.nick
1236 }
1237
[60]1238 uc.SendMessage(&irc.Message{
[92]1239 Command: "CAP",
1240 Params: []string{"LS", "302"},
1241 })
1242
[93]1243 if uc.network.Pass != "" {
1244 uc.SendMessage(&irc.Message{
1245 Command: "PASS",
1246 Params: []string{uc.network.Pass},
1247 })
1248 }
1249
[92]1250 uc.SendMessage(&irc.Message{
[13]1251 Command: "NICK",
[69]1252 Params: []string{uc.nick},
[60]1253 })
1254 uc.SendMessage(&irc.Message{
[13]1255 Command: "USER",
[77]1256 Params: []string{uc.username, "0", "*", uc.realname},
[60]1257 })
[44]1258}
[13]1259
[197]1260func (uc *upstreamConn) runUntilRegistered() error {
1261 for !uc.registered {
[212]1262 msg, err := uc.ReadMessage()
[197]1263 if err != nil {
1264 return fmt.Errorf("failed to read message: %v", err)
1265 }
1266
1267 if err := uc.handleMessage(msg); err != nil {
1268 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1269 }
1270 }
1271
1272 return nil
1273}
1274
[95]1275func (uc *upstreamConn) requestSASL() bool {
1276 if uc.network.SASL.Mechanism == "" {
1277 return false
1278 }
1279
1280 v, ok := uc.caps["sasl"]
1281 if !ok {
1282 return false
1283 }
1284 if v != "" {
1285 mechanisms := strings.Split(v, ",")
1286 found := false
1287 for _, mech := range mechanisms {
1288 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1289 found = true
1290 break
1291 }
1292 }
1293 if !found {
1294 return false
1295 }
1296 }
1297
1298 return true
1299}
1300
1301func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1302 auth := &uc.network.SASL
1303 switch name {
1304 case "sasl":
1305 if !ok {
1306 uc.logger.Printf("server refused to acknowledge the SASL capability")
1307 return nil
1308 }
1309
1310 switch auth.Mechanism {
1311 case "PLAIN":
1312 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1313 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1314 default:
1315 return fmt.Errorf("unsupported SASL mechanism %q", name)
1316 }
1317
1318 uc.SendMessage(&irc.Message{
1319 Command: "AUTHENTICATE",
1320 Params: []string{auth.Mechanism},
1321 })
[152]1322 case "message-tags":
1323 uc.tagsSupported = ok
[155]1324 case "labeled-response":
1325 uc.labelsSupported = ok
[193]1326 case "batch", "server-time":
1327 // Nothing to do
1328 default:
1329 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
[95]1330 }
1331 return nil
1332}
1333
[165]1334func (uc *upstreamConn) readMessages(ch chan<- event) error {
[13]1335 for {
[210]1336 msg, err := uc.ReadMessage()
[13]1337 if err == io.EOF {
1338 break
1339 } else if err != nil {
1340 return fmt.Errorf("failed to read IRC command: %v", err)
1341 }
1342
[165]1343 ch <- eventUpstreamMessage{msg, uc}
[13]1344 }
1345
[45]1346 return nil
[13]1347}
[60]1348
[176]1349func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
[155]1350 if uc.labelsSupported {
1351 if msg.Tags == nil {
1352 msg.Tags = make(map[string]irc.TagValue)
1353 }
[176]1354 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
[161]1355 uc.nextLabelID++
[155]1356 }
1357 uc.SendMessage(msg)
1358}
[178]1359
1360// TODO: handle moving logs when a network name changes, when support for this is added
[215]1361func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
[178]1362 if uc.srv.LogPath == "" {
1363 return
1364 }
[215]1365
1366 ml, ok := uc.messageLoggers[entity]
1367 if !ok {
1368 ml = newMessageLogger(uc, entity)
1369 uc.messageLoggers[entity] = ml
[178]1370 }
1371
[215]1372 if err := ml.Append(msg); err != nil {
1373 uc.logger.Printf("failed to log message: %v", err)
[178]1374 }
1375}
[198]1376
1377func (uc *upstreamConn) updateAway() {
1378 away := true
1379 uc.forEachDownstream(func(*downstreamConn) {
1380 away = false
1381 })
1382 if away == uc.away {
1383 return
1384 }
1385 if away {
1386 uc.SendMessage(&irc.Message{
1387 Command: "AWAY",
1388 Params: []string{"Auto away"},
1389 })
1390 } else {
1391 uc.SendMessage(&irc.Message{
1392 Command: "AWAY",
1393 })
1394 }
1395 uc.away = away
1396}
Note: See TracBrowser for help on using the repository browser.