Changeset 682 in code for trunk


Ignore:
Timestamp:
Nov 9, 2021, 9:09:17 PM (4 years ago)
Author:
contact
Message:

Add a queue for WHO commands

This has the following upsides:

  • We can now routes WHO replies to the correct client, without broadcasting them to everybody.
  • We are less likely to hit server rate limits when multiple downstreams are issuing WHO commands at the same time.
Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r681 r682  
    18651865                }
    18661866
    1867                 uc.enqueueLIST(dc, msg)
     1867                uc.enqueueCommand(dc, msg)
    18681868        case "NAMES":
    18691869                if len(msg.Params) == 0 {
     
    19871987                }
    19881988
    1989                 uc.SendMessageLabeled(dc.id, &irc.Message{
     1989                uc.enqueueCommand(dc, &irc.Message{
    19901990                        Command: "WHO",
    19911991                        Params:  params,
  • trunk/upstream.go

    r681 r682  
    7575type pendingUpstreamCommand struct {
    7676        downstreamID uint64
    77         cmd          *irc.Message
     77        msg          *irc.Message
    7878}
    7979
     
    110110        casemapIsSet bool
    111111
    112         // Queue of LIST commands in progress. The first entry has been sent to the
    113         // server and is awaiting reply. The following entries have not been sent
    114         // yet.
    115         pendingLIST []pendingUpstreamCommand
     112        // Queue of commands in progress, indexed by type. The first entry has been
     113        // sent to the server and is awaiting reply. The following entries have not
     114        // been sent yet.
     115        pendingCmds map[string][]pendingUpstreamCommand
    116116
    117117        gotMotd bool
     
    209209                availableMemberships:  stdMemberships,
    210210                isupport:              make(map[string]*string),
     211                pendingCmds:           make(map[string][]pendingUpstreamCommand),
    211212        }
    212213        return uc, nil
     
    226227}
    227228
     229func (uc *upstreamConn) downstreamByID(id uint64) *downstreamConn {
     230        for _, dc := range uc.user.downstreamConns {
     231                if dc.id == id {
     232                        return dc
     233                }
     234        }
     235        return nil
     236}
     237
    228238func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
    229239        ch := uc.channels.Value(name)
     
    242252}
    243253
    244 func (uc *upstreamConn) endPendingLISTs() {
    245         for _, pendingCmd := range uc.pendingLIST {
    246                 uc.forEachDownstreamByID(pendingCmd.downstreamID, func(dc *downstreamConn) {
    247                         dc.SendMessage(&irc.Message{
    248                                 Prefix:  dc.srv.prefix(),
    249                                 Command: irc.RPL_LISTEND,
    250                                 Params:  []string{dc.nick, "End of /LIST"},
    251                         })
    252                 })
    253         }
    254         uc.pendingLIST = nil
    255 }
    256 
    257 func (uc *upstreamConn) sendNextPendingLIST() {
    258         if len(uc.pendingLIST) == 0 {
     254func (uc *upstreamConn) endPendingCommands() {
     255        for _, l := range uc.pendingCmds {
     256                for _, pendingCmd := range l {
     257                        dc := uc.downstreamByID(pendingCmd.downstreamID)
     258                        if dc == nil {
     259                                continue
     260                        }
     261
     262                        switch pendingCmd.msg.Command {
     263                        case "LIST":
     264                                dc.SendMessage(&irc.Message{
     265                                        Prefix:  dc.srv.prefix(),
     266                                        Command: irc.RPL_LISTEND,
     267                                        Params:  []string{dc.nick, "End of /LIST"},
     268                                })
     269                        case "WHO":
     270                                mask := "*"
     271                                if len(pendingCmd.msg.Params) > 0 {
     272                                        mask = pendingCmd.msg.Params[0]
     273                                }
     274                                dc.SendMessage(&irc.Message{
     275                                        Prefix:  dc.srv.prefix(),
     276                                        Command: irc.RPL_ENDOFWHO,
     277                                        Params:  []string{dc.nick, mask, "End of /WHO"},
     278                                })
     279                        default:
     280                                panic(fmt.Errorf("Unsupported pending command %q", pendingCmd.msg.Command))
     281                        }
     282                }
     283        }
     284
     285        uc.pendingCmds = make(map[string][]pendingUpstreamCommand)
     286}
     287
     288func (uc *upstreamConn) sendNextPendingCommand(cmd string) {
     289        if len(uc.pendingCmds[cmd]) == 0 {
    259290                return
    260291        }
    261         uc.SendMessage(uc.pendingLIST[0].cmd)
    262 }
    263 
    264 func (uc *upstreamConn) enqueueLIST(dc *downstreamConn, cmd *irc.Message) {
    265         uc.pendingLIST = append(uc.pendingLIST, pendingUpstreamCommand{
     292        uc.SendMessage(uc.pendingCmds[cmd][0].msg)
     293}
     294
     295func (uc *upstreamConn) enqueueCommand(dc *downstreamConn, msg *irc.Message) {
     296        switch msg.Command {
     297        case "LIST", "WHO":
     298                // Supported
     299        default:
     300                panic(fmt.Errorf("Unsupported pending command %q", msg.Command))
     301        }
     302
     303        uc.pendingCmds[msg.Command] = append(uc.pendingCmds[msg.Command], pendingUpstreamCommand{
    266304                downstreamID: dc.id,
    267                 cmd:          cmd,
     305                msg:          msg,
    268306        })
    269307
    270         if len(uc.pendingLIST) == 1 {
    271                 uc.sendNextPendingLIST()
    272         }
    273 }
    274 
    275 func (uc *upstreamConn) currentPendingLIST() (*downstreamConn, *irc.Message) {
    276         if len(uc.pendingLIST) == 0 {
     308        if len(uc.pendingCmds[msg.Command]) == 1 {
     309                uc.sendNextPendingCommand(msg.Command)
     310        }
     311}
     312
     313func (uc *upstreamConn) currentPendingCommand(cmd string) (*downstreamConn, *irc.Message) {
     314        if len(uc.pendingCmds[cmd]) == 0 {
    277315                return nil, nil
    278316        }
    279317
    280         pendingCmd := uc.pendingLIST[0]
    281         for _, dc := range uc.user.downstreamConns {
    282                 if dc.id == pendingCmd.downstreamID {
    283                         return dc, pendingCmd.cmd
    284                 }
    285         }
    286 
    287         return nil, pendingCmd.cmd
    288 }
    289 
    290 func (uc *upstreamConn) dequeueLIST() (*downstreamConn, *irc.Message) {
    291         dc, cmd := uc.currentPendingLIST()
    292 
    293         if len(uc.pendingLIST) > 0 {
    294                 copy(uc.pendingLIST, uc.pendingLIST[1:])
    295                 uc.pendingLIST = uc.pendingLIST[:len(uc.pendingLIST)-1]
    296         }
    297 
    298         uc.sendNextPendingLIST()
    299 
    300         return dc, cmd
     318        pendingCmd := uc.pendingCmds[cmd][0]
     319        return uc.downstreamByID(pendingCmd.downstreamID), pendingCmd.msg
     320}
     321
     322func (uc *upstreamConn) dequeueCommand(cmd string) (*downstreamConn, *irc.Message) {
     323        dc, msg := uc.currentPendingCommand(cmd)
     324
     325        if len(uc.pendingCmds[cmd]) > 0 {
     326                copy(uc.pendingCmds[cmd], uc.pendingCmds[cmd][1:])
     327                uc.pendingCmds[cmd] = uc.pendingCmds[cmd][:len(uc.pendingCmds[cmd])-1]
     328        }
     329
     330        uc.sendNextPendingCommand(cmd)
     331
     332        return dc, msg
    301333}
    302334
     
    10961128                }
    10971129
    1098                 dc, cmd := uc.currentPendingLIST()
     1130                dc, cmd := uc.currentPendingCommand("LIST")
    10991131                if cmd == nil {
    11001132                        return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
     
    11091141                })
    11101142        case irc.RPL_LISTEND:
    1111                 dc, cmd := uc.dequeueLIST()
     1143                dc, cmd := uc.dequeueCommand("LIST")
    11121144                if cmd == nil {
    11131145                        return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
     
    11961228                }
    11971229
     1230                dc, cmd := uc.currentPendingCommand("WHO")
     1231                if cmd == nil {
     1232                        return fmt.Errorf("unexpected RPL_WHOREPLY: no matching pending WHO")
     1233                } else if dc == nil {
     1234                        return nil
     1235                }
     1236
    11981237                parts := strings.SplitN(trailing, " ", 2)
    11991238                if len(parts) != 2 {
     
    12091248                trailing = strconv.Itoa(hops) + " " + realname
    12101249
    1211                 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
    1212                         channel := channel
    1213                         if channel != "*" {
    1214                                 channel = dc.marshalEntity(uc.network, channel)
    1215                         }
    1216                         nick := dc.marshalEntity(uc.network, nick)
    1217                         dc.SendMessage(&irc.Message{
    1218                                 Prefix:  dc.srv.prefix(),
    1219                                 Command: irc.RPL_WHOREPLY,
    1220                                 Params:  []string{dc.nick, channel, username, host, server, nick, mode, trailing},
    1221                         })
    1222                 })
     1250                if channel != "*" {
     1251                        channel = dc.marshalEntity(uc.network, channel)
     1252                }
     1253                nick = dc.marshalEntity(uc.network, nick)
     1254                dc.SendMessage(&irc.Message{
     1255                        Prefix:  dc.srv.prefix(),
     1256                        Command: irc.RPL_WHOREPLY,
     1257                        Params:  []string{dc.nick, channel, username, host, server, nick, mode, trailing},
     1258                })
     1259        case rpl_whospcrpl:
     1260                dc, cmd := uc.currentPendingCommand("WHO")
     1261                if cmd == nil {
     1262                        return fmt.Errorf("unexpected RPL_WHOSPCRPL: no matching pending WHO")
     1263                } else if dc == nil {
     1264                        return nil
     1265                }
     1266
     1267                // Only supported in single-upstream mode, so forward as-is
     1268                dc.SendMessage(msg)
    12231269        case irc.RPL_ENDOFWHO:
    12241270                var name string
     
    12271273                }
    12281274
    1229                 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
    1230                         name := name
    1231                         if name != "*" {
    1232                                 // TODO: support WHO masks
    1233                                 name = dc.marshalEntity(uc.network, name)
    1234                         }
    1235                         dc.SendMessage(&irc.Message{
    1236                                 Prefix:  dc.srv.prefix(),
    1237                                 Command: irc.RPL_ENDOFWHO,
    1238                                 Params:  []string{dc.nick, name, "End of /WHO list"},
    1239                         })
     1275                dc, cmd := uc.dequeueCommand("WHO")
     1276                if cmd == nil {
     1277                        return fmt.Errorf("unexpected RPL_ENDOFWHO: no matching pending WHO")
     1278                } else if dc == nil {
     1279                        return nil
     1280                }
     1281
     1282                mask := "*"
     1283                if len(cmd.Params) > 0 {
     1284                        mask = cmd.Params[0]
     1285                }
     1286                dc.SendMessage(&irc.Message{
     1287                        Prefix:  dc.srv.prefix(),
     1288                        Command: irc.RPL_ENDOFWHO,
     1289                        Params:  []string{dc.nick, mask, "End of /WHO list"},
    12401290                })
    12411291        case irc.RPL_WHOISUSER:
     
    14371487                }
    14381488
    1439                 if command == "LIST" {
    1440                         uc.endPendingLISTs()
     1489                if command == "LIST" || command == "WHO" {
     1490                        dc, _ := uc.dequeueCommand(command)
     1491                        if dc != nil && downstreamID == 0 {
     1492                                downstreamID = dc.id
     1493                        }
    14411494                }
    14421495
     
    14541507        case irc.RPL_YOURHOST, irc.RPL_CREATED:
    14551508                // Ignore
    1456         case rpl_whospcrpl:
    1457                 // Not supported in multi-upstream mode, forward as-is
    1458                 uc.forEachDownstream(func(dc *downstreamConn) {
    1459                         dc.SendMessage(msg)
    1460                 })
    14611509        case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
    14621510                fallthrough
  • trunk/user.go

    r681 r682  
    682682        uc.network.conn = nil
    683683
    684         uc.endPendingLISTs()
     684        uc.endPendingCommands()
    685685
    686686        for _, entry := range uc.channels.innerMap {
Note: See TracChangeset for help on using the changeset viewer.