Changeset 757 in code


Ignore:
Timestamp:
Dec 8, 2021, 5:03:40 PM (4 years ago)
Author:
contact
Message:

Add context to {conn,upstreamConn}.SendMessage

This avoids blocking on upstream message rate limiting for too
long.

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/conn.go

    r748 r757  
    218218// If the connection is closed before the message is sent, SendMessage silently
    219219// drops the message.
    220 func (c *conn) SendMessage(msg *irc.Message) {
     220func (c *conn) SendMessage(ctx context.Context, msg *irc.Message) {
    221221        c.lock.Lock()
    222222        defer c.lock.Unlock()
     
    225225                return
    226226        }
    227         c.outgoing <- msg
     227
     228        select {
     229        case c.outgoing <- msg:
     230                // Success
     231        case <-ctx.Done():
     232                c.logger.Printf("failed to send message: %v", ctx.Err())
     233        }
    228234}
    229235
  • trunk/downstream.go

    r754 r757  
    552552
    553553        dc.srv.metrics.downstreamOutMessagesTotal.Inc()
    554         dc.conn.SendMessage(msg)
     554        dc.conn.SendMessage(context.TODO(), msg)
    555555}
    556556
     
    16671667                                return
    16681668                        }
    1669                         uc.SendMessageLabeled(dc.id, &irc.Message{
     1669                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    16701670                                Command: "NICK",
    16711671                                Params:  []string{nick},
     
    17011701                        // support setname
    17021702                        if uc := n.conn; uc != nil && uc.caps["setname"] {
    1703                                 uc.SendMessageLabeled(dc.id, &irc.Message{
     1703                                uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    17041704                                        Command: "SETNAME",
    17051705                                        Params:  []string{realname},
     
    17761776                                params = append(params, key)
    17771777                        }
    1778                         uc.SendMessageLabeled(dc.id, &irc.Message{
     1778                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    17791779                                Command: "JOIN",
    17801780                                Params:  params,
     
    18361836                                        params = append(params, reason)
    18371837                                }
    1838                                 uc.SendMessageLabeled(dc.id, &irc.Message{
     1838                                uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    18391839                                        Command: "PART",
    18401840                                        Params:  params,
     
    18971897                                params = append(params, reason)
    18981898                        }
    1899                         uc.SendMessageLabeled(dc.id, &irc.Message{
     1899                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    19001900                                Command: "KICK",
    19011901                                Params:  params,
     
    19161916                        if modeStr != "" {
    19171917                                if uc := dc.upstream(); uc != nil {
    1918                                         uc.SendMessageLabeled(dc.id, &irc.Message{
     1918                                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    19191919                                                Command: "MODE",
    19201920                                                Params:  []string{uc.nick, modeStr},
     
    19571957                        params := []string{upstreamName, modeStr}
    19581958                        params = append(params, msg.Params[2:]...)
    1959                         uc.SendMessageLabeled(dc.id, &irc.Message{
     1959                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    19601960                                Command: "MODE",
    19611961                                Params:  params,
     
    20062006                if len(msg.Params) > 1 { // setting topic
    20072007                        topic := msg.Params[1]
    2008                         uc.SendMessageLabeled(dc.id, &irc.Message{
     2008                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    20092009                                Command: "TOPIC",
    20102010                                Params:  []string{upstreamName, topic},
     
    20712071                        } else {
    20722072                                // NAMES on a channel we have not joined, ask upstream
    2073                                 uc.SendMessageLabeled(dc.id, &irc.Message{
     2073                                uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    20742074                                        Command: "NAMES",
    20752075                                        Params:  []string{upstreamName},
     
    22712271                }
    22722272
    2273                 uc.SendMessageLabeled(dc.id, &irc.Message{
     2273                uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    22742274                        Command: "WHOIS",
    22752275                        Params:  params,
     
    23482348                                unmarshaledText = dc.unmarshalText(uc, text)
    23492349                        }
    2350                         uc.SendMessageLabeled(dc.id, &irc.Message{
     2350                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    23512351                                Tags:    tags,
    23522352                                Command: msg.Command,
     
    23992399                        }
    24002400
    2401                         uc.SendMessageLabeled(dc.id, &irc.Message{
     2401                        uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    24022402                                Tags:    tags,
    24032403                                Command: "TAGMSG",
     
    24312431                uc := ucChannel
    24322432
    2433                 uc.SendMessageLabeled(dc.id, &irc.Message{
     2433                uc.SendMessageLabeled(ctx, dc.id, &irc.Message{
    24342434                        Command: "INVITE",
    24352435                        Params:  []string{upstreamUser, upstreamChannel},
     
    28512851                }
    28522852
    2853                 uc.SendMessageLabeled(dc.id, msg)
     2853                uc.SendMessageLabeled(ctx, dc.id, msg)
    28542854        }
    28552855        return nil
  • trunk/service.go

    r755 r757  
    617617                return fmt.Errorf("failed to parse command %q: %v", params[1], err)
    618618        }
    619         uc.SendMessage(m)
     619        uc.SendMessage(ctx, m)
    620620
    621621        sendServicePRIVMSG(dc, fmt.Sprintf("sent command to %q", net.GetName()))
  • trunk/upstream.go

    r752 r757  
    343343                return
    344344        }
    345         uc.SendMessage(uc.pendingCmds[cmd][0].msg)
     345        uc.SendMessage(context.TODO(), uc.pendingCmds[cmd][0].msg)
    346346}
    347347
     
    451451        switch msg.Command {
    452452        case "PING":
    453                 uc.SendMessage(&irc.Message{
     453                uc.SendMessage(ctx, &irc.Message{
    454454                        Command: "PONG",
    455455                        Params:  msg.Params,
     
    530530                        }
    531531
    532                         uc.SendMessage(&irc.Message{
     532                        uc.SendMessage(ctx, &irc.Message{
    533533                                Command: "CAP",
    534534                                Params:  []string{"END"},
     
    584584                var challengeStr string
    585585                if err := parseMessageParams(msg, &challengeStr); err != nil {
    586                         uc.SendMessage(&irc.Message{
     586                        uc.SendMessage(ctx, &irc.Message{
    587587                                Command: "AUTHENTICATE",
    588588                                Params:  []string{"*"},
     
    596596                        challenge, err = base64.StdEncoding.DecodeString(challengeStr)
    597597                        if err != nil {
    598                                 uc.SendMessage(&irc.Message{
     598                                uc.SendMessage(ctx, &irc.Message{
    599599                                        Command: "AUTHENTICATE",
    600600                                        Params:  []string{"*"},
     
    613613                }
    614614                if err != nil {
    615                         uc.SendMessage(&irc.Message{
     615                        uc.SendMessage(ctx, &irc.Message{
    616616                                Command: "AUTHENTICATE",
    617617                                Params:  []string{"*"},
     
    626626                }
    627627
    628                 uc.SendMessage(&irc.Message{
     628                uc.SendMessage(ctx, &irc.Message{
    629629                        Command: "AUTHENTICATE",
    630630                        Params:  []string{respStr},
     
    670670
    671671                if !uc.registered {
    672                         uc.SendMessage(&irc.Message{
     672                        uc.SendMessage(ctx, &irc.Message{
    673673                                Command: "CAP",
    674674                                Params:  []string{"END"},
     
    708708
    709709                        for _, msg := range join(channels, keys) {
    710                                 uc.SendMessage(msg)
     710                                uc.SendMessage(ctx, msg)
    711711                        }
    712712                }
     
    932932                                uc.updateChannelAutoDetach(ch)
    933933
    934                                 uc.SendMessage(&irc.Message{
     934                                uc.SendMessage(ctx, &irc.Message{
    935935                                        Command: "MODE",
    936936                                        Params:  []string{ch},
     
    15321532                        if found {
    15331533                                uc.logger.Printf("desired nick %q is now available", wantNick)
    1534                                 uc.SendMessage(&irc.Message{
     1534                                uc.SendMessage(ctx, &irc.Message{
    15351535                                        Command: "NICK",
    15361536                                        Params:  []string{wantNick},
     
    17121712                        uc.nickCM = uc.network.casemap(uc.nick)
    17131713                        uc.logger.Printf("desired nick is not available, falling back to %q", uc.nick)
    1714                         uc.SendMessage(&irc.Message{
     1714                        uc.SendMessage(ctx, &irc.Message{
    17151715                                Command: "NICK",
    17161716                                Params:  []string{uc.nick},
     
    18261826        }
    18271827
    1828         uc.SendMessage(&irc.Message{
     1828        uc.SendMessage(context.TODO(), &irc.Message{
    18291829                Command: "CAP",
    18301830                Params:  []string{"REQ", strings.Join(requestCaps, " ")},
     
    18831883                }
    18841884
    1885                 uc.SendMessage(&irc.Message{
     1885                uc.SendMessage(context.TODO(), &irc.Message{
    18861886                        Command: "AUTHENTICATE",
    18871887                        Params:  []string{auth.Mechanism},
     
    19031903
    19041904func (uc *upstreamConn) register() {
     1905        ctx := context.TODO()
     1906
    19051907        uc.nick = GetNick(&uc.user.User, &uc.network.Network)
    19061908        uc.nickCM = uc.network.casemap(uc.nick)
     
    19081910        uc.realname = GetRealname(&uc.user.User, &uc.network.Network)
    19091911
    1910         uc.SendMessage(&irc.Message{
     1912        uc.SendMessage(ctx, &irc.Message{
    19111913                Command: "CAP",
    19121914                Params:  []string{"LS", "302"},
     
    19141916
    19151917        if uc.network.Pass != "" {
    1916                 uc.SendMessage(&irc.Message{
     1918                uc.SendMessage(ctx, &irc.Message{
    19171919                        Command: "PASS",
    19181920                        Params:  []string{uc.network.Pass},
     
    19201922        }
    19211923
    1922         uc.SendMessage(&irc.Message{
     1924        uc.SendMessage(ctx, &irc.Message{
    19231925                Command: "NICK",
    19241926                Params:  []string{uc.nick},
    19251927        })
    1926         uc.SendMessage(&irc.Message{
     1928        uc.SendMessage(ctx, &irc.Message{
    19271929                Command: "USER",
    19281930                Params:  []string{uc.username, "0", "*", uc.realname},
     
    19611963                        uc.logger.Printf("failed to parse connect command %q: %v", command, err)
    19621964                } else {
    1963                         uc.SendMessage(m)
     1965                        uc.SendMessage(context.TODO(), m)
    19641966                }
    19651967        }
     
    19831985}
    19841986
    1985 func (uc *upstreamConn) SendMessage(msg *irc.Message) {
     1987func (uc *upstreamConn) SendMessage(ctx context.Context, msg *irc.Message) {
    19861988        if !uc.caps["message-tags"] {
    19871989                msg = msg.Copy()
     
    19901992
    19911993        uc.srv.metrics.upstreamOutMessagesTotal.Inc()
    1992         uc.conn.SendMessage(msg)
    1993 }
    1994 
    1995 func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message) {
     1994        uc.conn.SendMessage(ctx, msg)
     1995}
     1996
     1997func (uc *upstreamConn) SendMessageLabeled(ctx context.Context, downstreamID uint64, msg *irc.Message) {
    19961998        if uc.caps["labeled-response"] {
    19971999                if msg.Tags == nil {
     
    20012003                uc.nextLabelID++
    20022004        }
    2003         uc.SendMessage(msg)
     2005        uc.SendMessage(ctx, msg)
    20042006}
    20052007
     
    20742076
    20752077func (uc *upstreamConn) updateAway() {
     2078        ctx := context.TODO()
     2079
    20762080        away := true
    20772081        uc.forEachDownstream(func(*downstreamConn) {
     
    20822086        }
    20832087        if away {
    2084                 uc.SendMessage(&irc.Message{
     2088                uc.SendMessage(ctx, &irc.Message{
    20852089                        Command: "AWAY",
    20862090                        Params:  []string{"Auto away"},
    20872091                })
    20882092        } else {
    2089                 uc.SendMessage(&irc.Message{
     2093                uc.SendMessage(ctx, &irc.Message{
    20902094                        Command: "AWAY",
    20912095                })
     
    21102114                return
    21112115        }
     2116
     2117        ctx := context.TODO()
    21122118
    21132119        add := make(map[string]struct{})
     
    21492155        if removeAll && len(addList) == 0 && len(removeList) > 0 {
    21502156                // Optimization when the last MONITOR-aware downstream disconnects
    2151                 uc.SendMessage(&irc.Message{
     2157                uc.SendMessage(ctx, &irc.Message{
    21522158                        Command: "MONITOR",
    21532159                        Params:  []string{"C"},
     
    21572163                msgs = append(msgs, generateMonitor("+", addList)...)
    21582164                for _, msg := range msgs {
    2159                         uc.SendMessage(msg)
     2165                        uc.SendMessage(ctx, msg)
    21602166                }
    21612167        }
Note: See TracChangeset for help on using the changeset viewer.