source: code/trunk/upstream.go@ 220

Last change on this file since 220 was 218, checked in by delthas, 5 years ago

Send one NOTICE on new upstream disconnect/connect errors

In order to notify the user when we are disconnected from a network
(either due to an error, or due a QUIT), and when we fail reconnecting,
this commit adds support for sending a short NOTICE message from the
service user to all relevant downstreams.

The last error is stored, and cleared on successful connection, to
ensure that the user is *not* flooded with identical connection error
messages, which can often happen when a server is down.

No lock is needed on lastError because it is only read and modified from
the user goroutine.

Closes: https://todo.sr.ht/~emersion/soju/27

File size: 33.9 KB
RevLine 
[98]1package soju
[13]2
3import (
4 "crypto/tls"
[95]5 "encoding/base64"
[155]6 "errors"
[13]7 "fmt"
8 "io"
9 "net"
[178]10 "os"
[19]11 "strconv"
[17]12 "strings"
[19]13 "time"
[13]14
[95]15 "github.com/emersion/go-sasl"
[13]16 "gopkg.in/irc.v3"
17)
18
[19]19type upstreamChannel struct {
[162]20 Name string
21 conn *upstreamConn
22 Topic string
23 TopicWho string
24 TopicTime time.Time
25 Status channelStatus
26 modes channelModes
27 creationTime string
28 Members map[string]*membership
29 complete bool
[19]30}
31
[13]32type upstreamConn struct {
[210]33 conn
[16]34
[210]35 network *network
36 user *user
37
[16]38 serverName string
39 availableUserModes string
[139]40 availableChannelModes map[byte]channelModeType
41 availableChannelTypes string
42 availableMemberships []membership
[19]43
44 registered bool
[42]45 nick string
[77]46 username string
47 realname string
[139]48 modes userModes
[19]49 channels map[string]*upstreamChannel
[92]50 caps map[string]string
[153]51 batches map[string]batch
[198]52 away bool
[95]53
[155]54 tagsSupported bool
55 labelsSupported bool
[161]56 nextLabelID uint64
[152]57
[95]58 saslClient sasl.Client
59 saslStarted bool
[177]60
61 // set of LIST commands in progress, per downstream
62 pendingLISTDownstreamSet map[uint64]struct{}
[178]63
[215]64 messageLoggers map[string]*messageLogger
[13]65}
66
[178]67type entityLog struct {
68 name string
69 file *os.File
70}
71
[77]72func connectToUpstream(network *network) (*upstreamConn, error) {
73 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
[33]74
[77]75 addr := network.Addr
76 if !strings.ContainsRune(addr, ':') {
77 addr = addr + ":6697"
78 }
79
[206]80 dialer := net.Dialer{Timeout: connectTimeout}
81
[77]82 logger.Printf("connecting to TLS server at address %q", addr)
[206]83 netConn, err := tls.DialWithDialer(&dialer, "tcp", addr, nil)
[33]84 if err != nil {
[77]85 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
[33]86 }
87
[55]88 uc := &upstreamConn{
[210]89 conn: *newConn(network.user.srv, netConn, logger),
[177]90 network: network,
91 user: network.user,
92 channels: make(map[string]*upstreamChannel),
93 caps: make(map[string]string),
94 batches: make(map[string]batch),
95 availableChannelTypes: stdChannelTypes,
96 availableChannelModes: stdChannelModes,
97 availableMemberships: stdMemberships,
98 pendingLISTDownstreamSet: make(map[uint64]struct{}),
[215]99 messageLoggers: make(map[string]*messageLogger),
[33]100 }
101
[55]102 return uc, nil
[33]103}
104
[73]105func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
[218]106 uc.network.forEachDownstream(f)
[73]107}
108
[161]109func (uc *upstreamConn) forEachDownstreamByID(id uint64, f func(*downstreamConn)) {
[155]110 uc.forEachDownstream(func(dc *downstreamConn) {
111 if id != 0 && id != dc.id {
112 return
113 }
114 f(dc)
115 })
116}
117
[55]118func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
119 ch, ok := uc.channels[name]
[19]120 if !ok {
121 return nil, fmt.Errorf("unknown channel %q", name)
122 }
123 return ch, nil
124}
125
[129]126func (uc *upstreamConn) isChannel(entity string) bool {
[139]127 if i := strings.IndexByte(uc.availableChannelTypes, entity[0]); i >= 0 {
128 return true
[129]129 }
130 return false
131}
132
[181]133func (uc *upstreamConn) getPendingLIST() *pendingLIST {
[177]134 for _, pl := range uc.user.pendingLISTs {
135 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
136 continue
137 }
138 return &pl
139 }
140 return nil
141}
142
[181]143func (uc *upstreamConn) endPendingLISTs(all bool) (found bool) {
[177]144 found = false
145 for i := 0; i < len(uc.user.pendingLISTs); i++ {
146 pl := uc.user.pendingLISTs[i]
147 if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
148 continue
149 }
150 delete(pl.pendingCommands, uc.network.ID)
151 if len(pl.pendingCommands) == 0 {
152 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
153 i--
154 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
155 dc.SendMessage(&irc.Message{
156 Prefix: dc.srv.prefix(),
157 Command: irc.RPL_LISTEND,
158 Params: []string{dc.nick, "End of /LIST"},
159 })
160 })
161 }
162 found = true
163 if !all {
164 delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
165 uc.user.forEachUpstream(func(uc *upstreamConn) {
[181]166 uc.trySendLIST(pl.downstreamID)
[177]167 })
168 return
169 }
170 }
171 return
172}
173
[181]174func (uc *upstreamConn) trySendLIST(downstreamID uint64) {
[177]175 // must be called with a lock in uc.user.pendingLISTsLock
176
177 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
178 // a LIST command is already pending
179 // we will try again when that command is completed
180 return
181 }
182
183 for _, pl := range uc.user.pendingLISTs {
184 if pl.downstreamID != downstreamID {
185 continue
186 }
187 // this is the first pending LIST command list of the downstream
188 listCommand, ok := pl.pendingCommands[uc.network.ID]
189 if !ok {
190 // there is no command for this upstream in these LIST commands
191 // do not send anything
192 continue
193 }
194 // there is a command for this upstream in these LIST commands
195 // send it now
196
197 uc.SendMessageLabeled(downstreamID, listCommand)
198
199 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
200 return
201 }
202}
203
[139]204func (uc *upstreamConn) parseMembershipPrefix(s string) (membership *membership, nick string) {
205 for _, m := range uc.availableMemberships {
206 if m.Prefix == s[0] {
207 return &m, s[1:]
208 }
209 }
210 return nil, s
211}
212
[55]213func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[155]214 var label string
215 if l, ok := msg.GetTag("label"); ok {
216 label = l
217 }
218
[153]219 var msgBatch *batch
220 if batchName, ok := msg.GetTag("batch"); ok {
221 b, ok := uc.batches[batchName]
222 if !ok {
223 return fmt.Errorf("unexpected batch reference: batch was not defined: %q", batchName)
224 }
225 msgBatch = &b
[155]226 if label == "" {
227 label = msgBatch.Label
228 }
[153]229 }
230
[161]231 var downstreamID uint64 = 0
[155]232 if label != "" {
233 var labelOffset uint64
[161]234 n, err := fmt.Sscanf(label, "sd-%d-%d", &downstreamID, &labelOffset)
[155]235 if err == nil && n < 2 {
236 err = errors.New("not enough arguments")
237 }
238 if err != nil {
239 return fmt.Errorf("unexpected message label: invalid downstream reference for label %q: %v", label, err)
240 }
241 }
242
[216]243 if _, ok := msg.Tags["time"]; !ok {
244 msg.Tags["time"] = irc.TagValue(time.Now().Format(serverTimeLayout))
245 }
246
[13]247 switch msg.Command {
248 case "PING":
[60]249 uc.SendMessage(&irc.Message{
[13]250 Command: "PONG",
[68]251 Params: msg.Params,
[60]252 })
[33]253 return nil
[18]254 case "NOTICE":
[171]255 if msg.Prefix.User == "" && msg.Prefix.Host == "" { // server message
[217]256 uc.network.ring.Produce(msg)
[171]257 } else { // regular user NOTICE
[217]258 var entity, text string
259 if err := parseMessageParams(msg, &entity, &text); err != nil {
[171]260 return err
261 }
262
[217]263 target := entity
264 if target == uc.nick {
[178]265 target = msg.Prefix.Name
266 }
[215]267 uc.appendLog(target, msg)
[178]268
[217]269 uc.network.ring.Produce(msg)
[171]270 }
[92]271 case "CAP":
[95]272 var subCmd string
273 if err := parseMessageParams(msg, nil, &subCmd); err != nil {
274 return err
[92]275 }
[95]276 subCmd = strings.ToUpper(subCmd)
277 subParams := msg.Params[2:]
278 switch subCmd {
279 case "LS":
280 if len(subParams) < 1 {
281 return newNeedMoreParamsError(msg.Command)
282 }
283 caps := strings.Fields(subParams[len(subParams)-1])
284 more := len(subParams) >= 2 && msg.Params[len(subParams)-2] == "*"
[92]285
[95]286 for _, s := range caps {
287 kv := strings.SplitN(s, "=", 2)
288 k := strings.ToLower(kv[0])
289 var v string
290 if len(kv) == 2 {
291 v = kv[1]
292 }
293 uc.caps[k] = v
[92]294 }
295
[95]296 if more {
297 break // wait to receive all capabilities
298 }
299
[152]300 requestCaps := make([]string, 0, 16)
[193]301 for _, c := range []string{"message-tags", "batch", "labeled-response", "server-time"} {
[152]302 if _, ok := uc.caps[c]; ok {
303 requestCaps = append(requestCaps, c)
304 }
305 }
306
[95]307 if uc.requestSASL() {
[152]308 requestCaps = append(requestCaps, "sasl")
309 }
310
311 if len(requestCaps) > 0 {
[95]312 uc.SendMessage(&irc.Message{
313 Command: "CAP",
[152]314 Params: []string{"REQ", strings.Join(requestCaps, " ")},
[95]315 })
[152]316 }
317
318 if uc.requestSASL() {
[95]319 break // we'll send CAP END after authentication is completed
320 }
321
[92]322 uc.SendMessage(&irc.Message{
323 Command: "CAP",
324 Params: []string{"END"},
325 })
[95]326 case "ACK", "NAK":
327 if len(subParams) < 1 {
328 return newNeedMoreParamsError(msg.Command)
329 }
330 caps := strings.Fields(subParams[0])
331
332 for _, name := range caps {
333 if err := uc.handleCapAck(strings.ToLower(name), subCmd == "ACK"); err != nil {
334 return err
335 }
336 }
337
338 if uc.saslClient == nil {
339 uc.SendMessage(&irc.Message{
340 Command: "CAP",
341 Params: []string{"END"},
342 })
343 }
344 default:
345 uc.logger.Printf("unhandled message: %v", msg)
[92]346 }
[95]347 case "AUTHENTICATE":
348 if uc.saslClient == nil {
349 return fmt.Errorf("received unexpected AUTHENTICATE message")
350 }
351
352 // TODO: if a challenge is 400 bytes long, buffer it
353 var challengeStr string
354 if err := parseMessageParams(msg, &challengeStr); err != nil {
355 uc.SendMessage(&irc.Message{
356 Command: "AUTHENTICATE",
357 Params: []string{"*"},
358 })
359 return err
360 }
361
362 var challenge []byte
363 if challengeStr != "+" {
364 var err error
365 challenge, err = base64.StdEncoding.DecodeString(challengeStr)
366 if err != nil {
367 uc.SendMessage(&irc.Message{
368 Command: "AUTHENTICATE",
369 Params: []string{"*"},
370 })
371 return err
372 }
373 }
374
375 var resp []byte
376 var err error
377 if !uc.saslStarted {
378 _, resp, err = uc.saslClient.Start()
379 uc.saslStarted = true
380 } else {
381 resp, err = uc.saslClient.Next(challenge)
382 }
383 if err != nil {
384 uc.SendMessage(&irc.Message{
385 Command: "AUTHENTICATE",
386 Params: []string{"*"},
387 })
388 return err
389 }
390
391 // TODO: send response in multiple chunks if >= 400 bytes
392 var respStr = "+"
393 if resp != nil {
394 respStr = base64.StdEncoding.EncodeToString(resp)
395 }
396
397 uc.SendMessage(&irc.Message{
398 Command: "AUTHENTICATE",
399 Params: []string{respStr},
400 })
[125]401 case irc.RPL_LOGGEDIN:
[95]402 var account string
403 if err := parseMessageParams(msg, nil, nil, &account); err != nil {
404 return err
405 }
406 uc.logger.Printf("logged in with account %q", account)
[125]407 case irc.RPL_LOGGEDOUT:
[95]408 uc.logger.Printf("logged out")
[125]409 case irc.ERR_NICKLOCKED, irc.RPL_SASLSUCCESS, irc.ERR_SASLFAIL, irc.ERR_SASLTOOLONG, irc.ERR_SASLABORTED:
[95]410 var info string
411 if err := parseMessageParams(msg, nil, &info); err != nil {
412 return err
413 }
414 switch msg.Command {
[125]415 case irc.ERR_NICKLOCKED:
[95]416 uc.logger.Printf("invalid nick used with SASL authentication: %v", info)
[125]417 case irc.ERR_SASLFAIL:
[95]418 uc.logger.Printf("SASL authentication failed: %v", info)
[125]419 case irc.ERR_SASLTOOLONG:
[95]420 uc.logger.Printf("SASL message too long: %v", info)
421 }
422
423 uc.saslClient = nil
424 uc.saslStarted = false
425
426 uc.SendMessage(&irc.Message{
427 Command: "CAP",
428 Params: []string{"END"},
429 })
[14]430 case irc.RPL_WELCOME:
[55]431 uc.registered = true
432 uc.logger.Printf("connection registered")
[19]433
[77]434 channels, err := uc.srv.db.ListChannels(uc.network.ID)
435 if err != nil {
436 uc.logger.Printf("failed to list channels from database: %v", err)
437 break
438 }
439
440 for _, ch := range channels {
[146]441 params := []string{ch.Name}
442 if ch.Key != "" {
443 params = append(params, ch.Key)
444 }
[60]445 uc.SendMessage(&irc.Message{
[19]446 Command: "JOIN",
[146]447 Params: params,
[60]448 })
[19]449 }
[16]450 case irc.RPL_MYINFO:
[139]451 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, nil); err != nil {
[43]452 return err
[16]453 }
[139]454 case irc.RPL_ISUPPORT:
455 if err := parseMessageParams(msg, nil, nil); err != nil {
456 return err
[16]457 }
[139]458 for _, token := range msg.Params[1 : len(msg.Params)-1] {
459 negate := false
460 parameter := token
461 value := ""
462 if strings.HasPrefix(token, "-") {
463 negate = true
464 token = token[1:]
465 } else {
466 if i := strings.IndexByte(token, '='); i >= 0 {
467 parameter = token[:i]
468 value = token[i+1:]
469 }
470 }
471 if !negate {
472 switch parameter {
473 case "CHANMODES":
474 parts := strings.SplitN(value, ",", 5)
475 if len(parts) < 4 {
476 return fmt.Errorf("malformed ISUPPORT CHANMODES value: %v", value)
477 }
478 modes := make(map[byte]channelModeType)
479 for i, mt := range []channelModeType{modeTypeA, modeTypeB, modeTypeC, modeTypeD} {
480 for j := 0; j < len(parts[i]); j++ {
481 mode := parts[i][j]
482 modes[mode] = mt
483 }
484 }
485 uc.availableChannelModes = modes
486 case "CHANTYPES":
487 uc.availableChannelTypes = value
488 case "PREFIX":
489 if value == "" {
490 uc.availableMemberships = nil
491 } else {
492 if value[0] != '(' {
493 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
494 }
495 sep := strings.IndexByte(value, ')')
496 if sep < 0 || len(value) != sep*2 {
497 return fmt.Errorf("malformed ISUPPORT PREFIX value: %v", value)
498 }
499 memberships := make([]membership, len(value)/2-1)
500 for i := range memberships {
501 memberships[i] = membership{
502 Mode: value[i+1],
503 Prefix: value[sep+i+1],
504 }
505 }
506 uc.availableMemberships = memberships
507 }
508 }
509 } else {
510 // TODO: handle ISUPPORT negations
511 }
512 }
[153]513 case "BATCH":
514 var tag string
515 if err := parseMessageParams(msg, &tag); err != nil {
516 return err
517 }
518
519 if strings.HasPrefix(tag, "+") {
520 tag = tag[1:]
521 if _, ok := uc.batches[tag]; ok {
522 return fmt.Errorf("unexpected BATCH reference tag: batch was already defined: %q", tag)
523 }
524 var batchType string
525 if err := parseMessageParams(msg, nil, &batchType); err != nil {
526 return err
527 }
[155]528 label := label
529 if label == "" && msgBatch != nil {
530 label = msgBatch.Label
531 }
[153]532 uc.batches[tag] = batch{
533 Type: batchType,
534 Params: msg.Params[2:],
535 Outer: msgBatch,
[155]536 Label: label,
[153]537 }
538 } else if strings.HasPrefix(tag, "-") {
539 tag = tag[1:]
540 if _, ok := uc.batches[tag]; !ok {
541 return fmt.Errorf("unknown BATCH reference tag: %q", tag)
542 }
543 delete(uc.batches, tag)
544 } else {
545 return fmt.Errorf("unexpected BATCH reference tag: missing +/- prefix: %q", tag)
546 }
[42]547 case "NICK":
[83]548 if msg.Prefix == nil {
549 return fmt.Errorf("expected a prefix")
550 }
551
[43]552 var newNick string
553 if err := parseMessageParams(msg, &newNick); err != nil {
554 return err
[42]555 }
556
[55]557 if msg.Prefix.Name == uc.nick {
558 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
559 uc.nick = newNick
[42]560 }
561
[55]562 for _, ch := range uc.channels {
[42]563 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
564 delete(ch.Members, msg.Prefix.Name)
565 ch.Members[newNick] = membership
[215]566 uc.appendLog(ch.Name, msg)
[42]567 }
568 }
[82]569
570 if msg.Prefix.Name != uc.nick {
571 uc.forEachDownstream(func(dc *downstreamConn) {
572 dc.SendMessage(&irc.Message{
573 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
574 Command: "NICK",
575 Params: []string{newNick},
576 })
577 })
578 }
[69]579 case "JOIN":
580 if msg.Prefix == nil {
581 return fmt.Errorf("expected a prefix")
582 }
[42]583
[43]584 var channels string
585 if err := parseMessageParams(msg, &channels); err != nil {
586 return err
[19]587 }
[34]588
[43]589 for _, ch := range strings.Split(channels, ",") {
[55]590 if msg.Prefix.Name == uc.nick {
591 uc.logger.Printf("joined channel %q", ch)
592 uc.channels[ch] = &upstreamChannel{
[34]593 Name: ch,
[55]594 conn: uc,
[139]595 Members: make(map[string]*membership),
[34]596 }
[139]597
598 uc.SendMessage(&irc.Message{
599 Command: "MODE",
600 Params: []string{ch},
601 })
[34]602 } else {
[55]603 ch, err := uc.getChannel(ch)
[34]604 if err != nil {
605 return err
606 }
[139]607 ch.Members[msg.Prefix.Name] = nil
[19]608 }
[69]609
[215]610 uc.appendLog(ch, msg)
[178]611
[73]612 uc.forEachDownstream(func(dc *downstreamConn) {
[69]613 dc.SendMessage(&irc.Message{
614 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
615 Command: "JOIN",
616 Params: []string{dc.marshalChannel(uc, ch)},
617 })
618 })
[19]619 }
[69]620 case "PART":
621 if msg.Prefix == nil {
622 return fmt.Errorf("expected a prefix")
623 }
[34]624
[43]625 var channels string
626 if err := parseMessageParams(msg, &channels); err != nil {
627 return err
[34]628 }
629
[178]630 var reason string
631 if len(msg.Params) > 1 {
632 reason = msg.Params[1]
633 }
634
[43]635 for _, ch := range strings.Split(channels, ",") {
[55]636 if msg.Prefix.Name == uc.nick {
637 uc.logger.Printf("parted channel %q", ch)
638 delete(uc.channels, ch)
[34]639 } else {
[55]640 ch, err := uc.getChannel(ch)
[34]641 if err != nil {
642 return err
643 }
644 delete(ch.Members, msg.Prefix.Name)
645 }
[69]646
[215]647 uc.appendLog(ch, msg)
[178]648
[73]649 uc.forEachDownstream(func(dc *downstreamConn) {
[215]650 params := []string{dc.marshalChannel(uc, ch)}
651 if reason != "" {
652 params = append(params, reason)
653 }
[69]654 dc.SendMessage(&irc.Message{
655 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
656 Command: "PART",
[215]657 Params: params,
[69]658 })
659 })
[34]660 }
[159]661 case "KICK":
662 if msg.Prefix == nil {
663 return fmt.Errorf("expected a prefix")
664 }
665
666 var channel, user string
667 if err := parseMessageParams(msg, &channel, &user); err != nil {
668 return err
669 }
670
671 var reason string
672 if len(msg.Params) > 2 {
[215]673 reason = msg.Params[2]
[159]674 }
675
676 if user == uc.nick {
677 uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
678 delete(uc.channels, channel)
679 } else {
680 ch, err := uc.getChannel(channel)
681 if err != nil {
682 return err
683 }
684 delete(ch.Members, user)
685 }
686
[215]687 uc.appendLog(channel, msg)
[178]688
[159]689 uc.forEachDownstream(func(dc *downstreamConn) {
690 params := []string{dc.marshalChannel(uc, channel), dc.marshalNick(uc, user)}
691 if reason != "" {
692 params = append(params, reason)
693 }
694 dc.SendMessage(&irc.Message{
695 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
696 Command: "KICK",
697 Params: params,
698 })
699 })
[83]700 case "QUIT":
701 if msg.Prefix == nil {
702 return fmt.Errorf("expected a prefix")
703 }
704
705 if msg.Prefix.Name == uc.nick {
706 uc.logger.Printf("quit")
707 }
708
709 for _, ch := range uc.channels {
[178]710 if _, ok := ch.Members[msg.Prefix.Name]; ok {
711 delete(ch.Members, msg.Prefix.Name)
712
[215]713 uc.appendLog(ch.Name, msg)
[178]714 }
[83]715 }
716
717 if msg.Prefix.Name != uc.nick {
718 uc.forEachDownstream(func(dc *downstreamConn) {
719 dc.SendMessage(&irc.Message{
720 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
721 Command: "QUIT",
722 Params: msg.Params,
723 })
724 })
725 }
[19]726 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]727 var name, topic string
728 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
729 return err
[19]730 }
[55]731 ch, err := uc.getChannel(name)
[19]732 if err != nil {
733 return err
734 }
735 if msg.Command == irc.RPL_TOPIC {
[43]736 ch.Topic = topic
[19]737 } else {
738 ch.Topic = ""
739 }
740 case "TOPIC":
[43]741 var name string
[74]742 if err := parseMessageParams(msg, &name); err != nil {
[43]743 return err
[19]744 }
[55]745 ch, err := uc.getChannel(name)
[19]746 if err != nil {
747 return err
748 }
749 if len(msg.Params) > 1 {
750 ch.Topic = msg.Params[1]
751 } else {
752 ch.Topic = ""
753 }
[74]754 uc.forEachDownstream(func(dc *downstreamConn) {
755 params := []string{dc.marshalChannel(uc, name)}
756 if ch.Topic != "" {
757 params = append(params, ch.Topic)
758 }
759 dc.SendMessage(&irc.Message{
760 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
761 Command: "TOPIC",
762 Params: params,
763 })
764 })
[139]765 case "MODE":
766 var name, modeStr string
767 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
768 return err
769 }
770
771 if !uc.isChannel(name) { // user mode change
772 if name != uc.nick {
773 return fmt.Errorf("received MODE message for unknown nick %q", name)
774 }
775 return uc.modes.Apply(modeStr)
776 // TODO: notify downstreams about user mode change?
777 } else { // channel mode change
778 ch, err := uc.getChannel(name)
779 if err != nil {
780 return err
781 }
782
783 if ch.modes != nil {
784 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[2:]...); err != nil {
785 return err
786 }
787 }
788
[215]789 uc.appendLog(ch.Name, msg)
[178]790
[139]791 uc.forEachDownstream(func(dc *downstreamConn) {
792 params := []string{dc.marshalChannel(uc, name), modeStr}
793 params = append(params, msg.Params[2:]...)
794
795 dc.SendMessage(&irc.Message{
796 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
797 Command: "MODE",
798 Params: params,
799 })
800 })
801 }
802 case irc.RPL_UMODEIS:
803 if err := parseMessageParams(msg, nil); err != nil {
804 return err
805 }
806 modeStr := ""
807 if len(msg.Params) > 1 {
808 modeStr = msg.Params[1]
809 }
810
811 uc.modes = ""
812 if err := uc.modes.Apply(modeStr); err != nil {
813 return err
814 }
815 // TODO: send RPL_UMODEIS to downstream connections when applicable
816 case irc.RPL_CHANNELMODEIS:
817 var channel string
818 if err := parseMessageParams(msg, nil, &channel); err != nil {
819 return err
820 }
821 modeStr := ""
822 if len(msg.Params) > 2 {
823 modeStr = msg.Params[2]
824 }
825
826 ch, err := uc.getChannel(channel)
827 if err != nil {
828 return err
829 }
830
831 firstMode := ch.modes == nil
832 ch.modes = make(map[byte]string)
833 if err := ch.modes.Apply(uc.availableChannelModes, modeStr, msg.Params[3:]...); err != nil {
834 return err
835 }
836 if firstMode {
837 modeStr, modeParams := ch.modes.Format()
838
839 uc.forEachDownstream(func(dc *downstreamConn) {
840 params := []string{dc.nick, dc.marshalChannel(uc, channel), modeStr}
841 params = append(params, modeParams...)
842
843 dc.SendMessage(&irc.Message{
844 Prefix: dc.srv.prefix(),
845 Command: irc.RPL_CHANNELMODEIS,
846 Params: params,
847 })
848 })
849 }
[162]850 case rpl_creationtime:
851 var channel, creationTime string
852 if err := parseMessageParams(msg, nil, &channel, &creationTime); err != nil {
853 return err
854 }
855
856 ch, err := uc.getChannel(channel)
857 if err != nil {
858 return err
859 }
860
861 firstCreationTime := ch.creationTime == ""
862 ch.creationTime = creationTime
863 if firstCreationTime {
864 uc.forEachDownstream(func(dc *downstreamConn) {
865 dc.SendMessage(&irc.Message{
866 Prefix: dc.srv.prefix(),
867 Command: rpl_creationtime,
868 Params: []string{dc.nick, channel, creationTime},
869 })
870 })
871 }
[19]872 case rpl_topicwhotime:
[43]873 var name, who, timeStr string
874 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
875 return err
[19]876 }
[55]877 ch, err := uc.getChannel(name)
[19]878 if err != nil {
879 return err
880 }
[43]881 ch.TopicWho = who
882 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]883 if err != nil {
884 return fmt.Errorf("failed to parse topic time: %v", err)
885 }
886 ch.TopicTime = time.Unix(sec, 0)
[177]887 case irc.RPL_LIST:
888 var channel, clients, topic string
889 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
890 return err
891 }
892
[181]893 pl := uc.getPendingLIST()
[177]894 if pl == nil {
895 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
896 }
897
898 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
899 dc.SendMessage(&irc.Message{
900 Prefix: dc.srv.prefix(),
901 Command: irc.RPL_LIST,
902 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
903 })
904 })
905 case irc.RPL_LISTEND:
[181]906 ok := uc.endPendingLISTs(false)
[177]907 if !ok {
908 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
909 }
[19]910 case irc.RPL_NAMREPLY:
[43]911 var name, statusStr, members string
912 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
913 return err
[19]914 }
[140]915
916 ch, ok := uc.channels[name]
917 if !ok {
918 // NAMES on a channel we have not joined, forward to downstream
[161]919 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[140]920 channel := dc.marshalChannel(uc, name)
[174]921 members := splitSpace(members)
[140]922 for i, member := range members {
923 membership, nick := uc.parseMembershipPrefix(member)
924 members[i] = membership.String() + dc.marshalNick(uc, nick)
925 }
926 memberStr := strings.Join(members, " ")
927
928 dc.SendMessage(&irc.Message{
929 Prefix: dc.srv.prefix(),
930 Command: irc.RPL_NAMREPLY,
931 Params: []string{dc.nick, statusStr, channel, memberStr},
932 })
933 })
934 return nil
[19]935 }
936
[43]937 status, err := parseChannelStatus(statusStr)
[19]938 if err != nil {
939 return err
940 }
941 ch.Status = status
942
[174]943 for _, s := range splitSpace(members) {
[139]944 membership, nick := uc.parseMembershipPrefix(s)
[19]945 ch.Members[nick] = membership
946 }
947 case irc.RPL_ENDOFNAMES:
[43]948 var name string
949 if err := parseMessageParams(msg, nil, &name); err != nil {
950 return err
[25]951 }
[140]952
953 ch, ok := uc.channels[name]
954 if !ok {
955 // NAMES on a channel we have not joined, forward to downstream
[161]956 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[140]957 channel := dc.marshalChannel(uc, name)
958
959 dc.SendMessage(&irc.Message{
960 Prefix: dc.srv.prefix(),
961 Command: irc.RPL_ENDOFNAMES,
962 Params: []string{dc.nick, channel, "End of /NAMES list"},
963 })
964 })
965 return nil
[25]966 }
967
[34]968 if ch.complete {
969 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
970 }
[25]971 ch.complete = true
[27]972
[73]973 uc.forEachDownstream(func(dc *downstreamConn) {
[27]974 forwardChannel(dc, ch)
[40]975 })
[127]976 case irc.RPL_WHOREPLY:
977 var channel, username, host, server, nick, mode, trailing string
978 if err := parseMessageParams(msg, nil, &channel, &username, &host, &server, &nick, &mode, &trailing); err != nil {
979 return err
980 }
981
982 parts := strings.SplitN(trailing, " ", 2)
983 if len(parts) != 2 {
984 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong trailing parameter: %s", trailing)
985 }
986 realname := parts[1]
987 hops, err := strconv.Atoi(parts[0])
988 if err != nil {
989 return fmt.Errorf("received malformed RPL_WHOREPLY: wrong hop count: %s", parts[0])
990 }
991 hops++
992
993 trailing = strconv.Itoa(hops) + " " + realname
994
[161]995 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[127]996 channel := channel
997 if channel != "*" {
998 channel = dc.marshalChannel(uc, channel)
999 }
1000 nick := dc.marshalNick(uc, nick)
1001 dc.SendMessage(&irc.Message{
1002 Prefix: dc.srv.prefix(),
1003 Command: irc.RPL_WHOREPLY,
1004 Params: []string{dc.nick, channel, username, host, server, nick, mode, trailing},
1005 })
1006 })
1007 case irc.RPL_ENDOFWHO:
1008 var name string
1009 if err := parseMessageParams(msg, nil, &name); err != nil {
1010 return err
1011 }
1012
[161]1013 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[127]1014 name := name
1015 if name != "*" {
1016 // TODO: support WHO masks
1017 name = dc.marshalEntity(uc, name)
1018 }
1019 dc.SendMessage(&irc.Message{
1020 Prefix: dc.srv.prefix(),
1021 Command: irc.RPL_ENDOFWHO,
[142]1022 Params: []string{dc.nick, name, "End of /WHO list"},
[127]1023 })
1024 })
[128]1025 case irc.RPL_WHOISUSER:
1026 var nick, username, host, realname string
1027 if err := parseMessageParams(msg, nil, &nick, &username, &host, nil, &realname); err != nil {
1028 return err
1029 }
1030
[161]1031 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1032 nick := dc.marshalNick(uc, nick)
1033 dc.SendMessage(&irc.Message{
1034 Prefix: dc.srv.prefix(),
1035 Command: irc.RPL_WHOISUSER,
1036 Params: []string{dc.nick, nick, username, host, "*", realname},
1037 })
1038 })
1039 case irc.RPL_WHOISSERVER:
1040 var nick, server, serverInfo string
1041 if err := parseMessageParams(msg, nil, &nick, &server, &serverInfo); err != nil {
1042 return err
1043 }
1044
[161]1045 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1046 nick := dc.marshalNick(uc, nick)
1047 dc.SendMessage(&irc.Message{
1048 Prefix: dc.srv.prefix(),
1049 Command: irc.RPL_WHOISSERVER,
1050 Params: []string{dc.nick, nick, server, serverInfo},
1051 })
1052 })
1053 case irc.RPL_WHOISOPERATOR:
1054 var nick string
1055 if err := parseMessageParams(msg, nil, &nick); err != nil {
1056 return err
1057 }
1058
[161]1059 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1060 nick := dc.marshalNick(uc, nick)
1061 dc.SendMessage(&irc.Message{
1062 Prefix: dc.srv.prefix(),
1063 Command: irc.RPL_WHOISOPERATOR,
1064 Params: []string{dc.nick, nick, "is an IRC operator"},
1065 })
1066 })
1067 case irc.RPL_WHOISIDLE:
1068 var nick string
1069 if err := parseMessageParams(msg, nil, &nick, nil); err != nil {
1070 return err
1071 }
1072
[161]1073 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1074 nick := dc.marshalNick(uc, nick)
1075 params := []string{dc.nick, nick}
1076 params = append(params, msg.Params[2:]...)
1077 dc.SendMessage(&irc.Message{
1078 Prefix: dc.srv.prefix(),
1079 Command: irc.RPL_WHOISIDLE,
1080 Params: params,
1081 })
1082 })
1083 case irc.RPL_WHOISCHANNELS:
1084 var nick, channelList string
1085 if err := parseMessageParams(msg, nil, &nick, &channelList); err != nil {
1086 return err
1087 }
[174]1088 channels := splitSpace(channelList)
[128]1089
[161]1090 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1091 nick := dc.marshalNick(uc, nick)
1092 channelList := make([]string, len(channels))
1093 for i, channel := range channels {
[139]1094 prefix, channel := uc.parseMembershipPrefix(channel)
[128]1095 channel = dc.marshalChannel(uc, channel)
1096 channelList[i] = prefix.String() + channel
1097 }
1098 channels := strings.Join(channelList, " ")
1099 dc.SendMessage(&irc.Message{
1100 Prefix: dc.srv.prefix(),
1101 Command: irc.RPL_WHOISCHANNELS,
1102 Params: []string{dc.nick, nick, channels},
1103 })
1104 })
1105 case irc.RPL_ENDOFWHOIS:
1106 var nick string
1107 if err := parseMessageParams(msg, nil, &nick); err != nil {
1108 return err
1109 }
1110
[161]1111 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
[128]1112 nick := dc.marshalNick(uc, nick)
1113 dc.SendMessage(&irc.Message{
1114 Prefix: dc.srv.prefix(),
1115 Command: irc.RPL_ENDOFWHOIS,
[142]1116 Params: []string{dc.nick, nick, "End of /WHOIS list"},
[128]1117 })
1118 })
[36]1119 case "PRIVMSG":
[117]1120 if msg.Prefix == nil {
1121 return fmt.Errorf("expected a prefix")
1122 }
1123
[217]1124 var entity, text string
1125 if err := parseMessageParams(msg, &entity, &text); err != nil {
[69]1126 return err
1127 }
[117]1128
1129 if msg.Prefix.Name == serviceNick {
1130 uc.logger.Printf("skipping PRIVMSG from soju's service: %v", msg)
1131 break
1132 }
[217]1133 if entity == serviceNick {
[117]1134 uc.logger.Printf("skipping PRIVMSG to soju's service: %v", msg)
1135 break
1136 }
1137
[217]1138 target := entity
1139 if target == uc.nick {
[178]1140 target = msg.Prefix.Name
1141 }
[215]1142 uc.appendLog(target, msg)
[178]1143
[143]1144 uc.network.ring.Produce(msg)
[115]1145 case "INVITE":
1146 var nick string
1147 var channel string
1148 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1149 return err
1150 }
1151
1152 uc.forEachDownstream(func(dc *downstreamConn) {
1153 dc.SendMessage(&irc.Message{
1154 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
1155 Command: "INVITE",
1156 Params: []string{dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1157 })
1158 })
[163]1159 case irc.RPL_INVITING:
1160 var nick string
1161 var channel string
1162 if err := parseMessageParams(msg, &nick, &channel); err != nil {
1163 return err
1164 }
1165
1166 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1167 dc.SendMessage(&irc.Message{
1168 Prefix: dc.srv.prefix(),
1169 Command: irc.RPL_INVITING,
1170 Params: []string{dc.nick, dc.marshalNick(uc, nick), dc.marshalChannel(uc, channel)},
1171 })
1172 })
[177]1173 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
1174 var command, reason string
1175 if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
1176 return err
1177 }
1178
1179 if command == "LIST" {
[181]1180 ok := uc.endPendingLISTs(false)
[177]1181 if !ok {
1182 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
1183 }
1184 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
1185 dc.SendMessage(&irc.Message{
1186 Prefix: uc.srv.prefix(),
1187 Command: msg.Command,
1188 Params: []string{dc.nick, "LIST", reason},
1189 })
1190 })
1191 }
[152]1192 case "TAGMSG":
1193 // TODO: relay to downstream connections that accept message-tags
[155]1194 case "ACK":
1195 // Ignore
[198]1196 case irc.RPL_NOWAWAY, irc.RPL_UNAWAY:
1197 // Ignore
[16]1198 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]1199 // Ignore
1200 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
1201 // Ignore
1202 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
1203 // Ignore
[177]1204 case irc.RPL_LISTSTART:
1205 // Ignore
[14]1206 case rpl_localusers, rpl_globalusers:
1207 // Ignore
[96]1208 case irc.RPL_STATSVLINE, rpl_statsping, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
[14]1209 // Ignore
[13]1210 default:
[95]1211 uc.logger.Printf("unhandled message: %v", msg)
[13]1212 }
[14]1213 return nil
[13]1214}
1215
[174]1216func splitSpace(s string) []string {
1217 return strings.FieldsFunc(s, func(r rune) bool {
1218 return r == ' '
1219 })
1220}
1221
[55]1222func (uc *upstreamConn) register() {
[77]1223 uc.nick = uc.network.Nick
1224 uc.username = uc.network.Username
1225 if uc.username == "" {
1226 uc.username = uc.nick
1227 }
1228 uc.realname = uc.network.Realname
1229 if uc.realname == "" {
1230 uc.realname = uc.nick
1231 }
1232
[60]1233 uc.SendMessage(&irc.Message{
[92]1234 Command: "CAP",
1235 Params: []string{"LS", "302"},
1236 })
1237
[93]1238 if uc.network.Pass != "" {
1239 uc.SendMessage(&irc.Message{
1240 Command: "PASS",
1241 Params: []string{uc.network.Pass},
1242 })
1243 }
1244
[92]1245 uc.SendMessage(&irc.Message{
[13]1246 Command: "NICK",
[69]1247 Params: []string{uc.nick},
[60]1248 })
1249 uc.SendMessage(&irc.Message{
[13]1250 Command: "USER",
[77]1251 Params: []string{uc.username, "0", "*", uc.realname},
[60]1252 })
[44]1253}
[13]1254
[197]1255func (uc *upstreamConn) runUntilRegistered() error {
1256 for !uc.registered {
[212]1257 msg, err := uc.ReadMessage()
[197]1258 if err != nil {
1259 return fmt.Errorf("failed to read message: %v", err)
1260 }
1261
1262 if err := uc.handleMessage(msg); err != nil {
1263 return fmt.Errorf("failed to handle message %q: %v", msg, err)
1264 }
1265 }
1266
1267 return nil
1268}
1269
[95]1270func (uc *upstreamConn) requestSASL() bool {
1271 if uc.network.SASL.Mechanism == "" {
1272 return false
1273 }
1274
1275 v, ok := uc.caps["sasl"]
1276 if !ok {
1277 return false
1278 }
1279 if v != "" {
1280 mechanisms := strings.Split(v, ",")
1281 found := false
1282 for _, mech := range mechanisms {
1283 if strings.EqualFold(mech, uc.network.SASL.Mechanism) {
1284 found = true
1285 break
1286 }
1287 }
1288 if !found {
1289 return false
1290 }
1291 }
1292
1293 return true
1294}
1295
1296func (uc *upstreamConn) handleCapAck(name string, ok bool) error {
1297 auth := &uc.network.SASL
1298 switch name {
1299 case "sasl":
1300 if !ok {
1301 uc.logger.Printf("server refused to acknowledge the SASL capability")
1302 return nil
1303 }
1304
1305 switch auth.Mechanism {
1306 case "PLAIN":
1307 uc.logger.Printf("starting SASL PLAIN authentication with username %q", auth.Plain.Username)
1308 uc.saslClient = sasl.NewPlainClient("", auth.Plain.Username, auth.Plain.Password)
1309 default:
1310 return fmt.Errorf("unsupported SASL mechanism %q", name)
1311 }
1312
1313 uc.SendMessage(&irc.Message{
1314 Command: "AUTHENTICATE",
1315 Params: []string{auth.Mechanism},
1316 })
[152]1317 case "message-tags":
1318 uc.tagsSupported = ok
[155]1319 case "labeled-response":
1320 uc.labelsSupported = ok
[193]1321 case "batch", "server-time":
1322 // Nothing to do
1323 default:
1324 uc.logger.Printf("received CAP ACK/NAK for a cap we don't support: %v", name)
[95]1325 }
1326 return nil
1327}
1328
[165]1329func (uc *upstreamConn) readMessages(ch chan<- event) error {
[13]1330 for {
[210]1331 msg, err := uc.ReadMessage()
[13]1332 if err == io.EOF {
1333 break
1334 } else if err != nil {
1335 return fmt.Errorf("failed to read IRC command: %v", err)
1336 }
1337
[165]1338 ch <- eventUpstreamMessage{msg, uc}
[13]1339 }
1340
[45]1341 return nil
[13]1342}
[60]1343
[176]1344func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
[155]1345 if uc.labelsSupported {
1346 if msg.Tags == nil {
1347 msg.Tags = make(map[string]irc.TagValue)
1348 }
[176]1349 msg.Tags["label"] = irc.TagValue(fmt.Sprintf("sd-%d-%d", downstreamID, uc.nextLabelID))
[161]1350 uc.nextLabelID++
[155]1351 }
1352 uc.SendMessage(msg)
1353}
[178]1354
1355// TODO: handle moving logs when a network name changes, when support for this is added
[215]1356func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
[178]1357 if uc.srv.LogPath == "" {
1358 return
1359 }
[215]1360
1361 ml, ok := uc.messageLoggers[entity]
1362 if !ok {
1363 ml = newMessageLogger(uc, entity)
1364 uc.messageLoggers[entity] = ml
[178]1365 }
1366
[215]1367 if err := ml.Append(msg); err != nil {
1368 uc.logger.Printf("failed to log message: %v", err)
[178]1369 }
1370}
[198]1371
1372func (uc *upstreamConn) updateAway() {
1373 away := true
1374 uc.forEachDownstream(func(*downstreamConn) {
1375 away = false
1376 })
1377 if away == uc.away {
1378 return
1379 }
1380 if away {
1381 uc.SendMessage(&irc.Message{
1382 Command: "AWAY",
1383 Params: []string{"Auto away"},
1384 })
1385 } else {
1386 uc.SendMessage(&irc.Message{
1387 Command: "AWAY",
1388 })
1389 }
1390 uc.away = away
1391}
Note: See TracBrowser for help on using the repository browser.