Changeset 177 in code for trunk


Ignore:
Timestamp:
Mar 27, 2020, 11:07:20 PM (5 years ago)
Author:
delthas
Message:

Add LIST support

This commit adds support for downstream LIST messages from multiple
concurrent downstreams to multiple concurrent upstreams, including
support for multiple pending LIST requests from the same downstream.

Because a unique RPL_LISTEND message must be sent to the requesting
downstream, and that there might be multiple upstreams, each sending
their own RPL_LISTEND, a cache of RPL_LISTEND replies of some sort is
required to match RPL_LISTEND together in order to only send one back
downstream.

This commit adds a list of "pending LIST" structs, which each contain a
map of all upstreams that yet need to send a RPL_LISTEND, and the
corresponding LIST request associated with that response. This list of
pending LISTs is sorted according to the order that the requesting
downstreams sent the LIST messages in. Each pending set also stores the
id of the requesting downstream, in order to only forward the replies to
it and no other downstream. (This is important because LIST replies can
typically amount to several thousands messages on large servers.)

When a single downstream makes multiple LIST requests, only the first
one will be immediately sent to the upstream servers. The next ones will
be buffered until the first one is completed. Distinct downstreams can
make concurrent LIST requests without any request buffering.

Each RPL_LIST message is forwarded to the downstream of the first
matching pending LIST struct.

When an upstream sends an RPL_LISTEND message, the upstream is removed
from the first matching pending LIST struct, but that message is not
immediately forwarded downstream. If there are no remaining pending LIST
requests in that struct is then empty, that means all upstreams have
sent back all their RPL_LISTEND replies (which means they also sent all
their RPL_LIST replies); so a unique RPL_LISTEND is sent to downstream
and that pending LIST set is removed from the cache.

Upstreams are removed from the pending LIST structs in two other cases:

  • when they are closed (to avoid stalling because of a disconnected

upstream that will never reply to the LIST message): they are removed
from all pending LIST structs

  • when they reply with an ERR_UNKNOWNCOMMAND or RPL_TRYAGAIN LIST reply,

which is typically used when a user is not allowed to LIST because they
just joined the server: they are removed from the first pending LIST
struct, as if an RPL_LISTEND message was received

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r176 r177  
    10781078                        sendTopic(dc, ch)
    10791079                }
     1080        case "LIST":
     1081                // TODO: support ELIST when supported by all upstreams
     1082
     1083                dc.user.pendingLISTsLock.Lock()
     1084                defer dc.user.pendingLISTsLock.Unlock()
     1085
     1086                pl := pendingLIST{
     1087                        downstreamID:    dc.id,
     1088                        pendingCommands: make(map[int64]*irc.Message),
     1089                }
     1090                var upstreamChannels map[int64][]string
     1091                if len(msg.Params) > 0 {
     1092                        upstreamChannels = make(map[int64][]string)
     1093                        channels := strings.Split(msg.Params[0], ",")
     1094                        for _, channel := range channels {
     1095                                uc, upstreamChannel, err := dc.unmarshalEntity(channel)
     1096                                if err != nil {
     1097                                        return err
     1098                                }
     1099                                upstreamChannels[uc.network.ID] = append(upstreamChannels[uc.network.ID], upstreamChannel)
     1100                        }
     1101                }
     1102
     1103                dc.user.pendingLISTs = append(dc.user.pendingLISTs, pl)
     1104                dc.forEachUpstream(func(uc *upstreamConn) {
     1105                        var params []string
     1106                        if upstreamChannels != nil {
     1107                                if channels, ok := upstreamChannels[uc.network.ID]; ok {
     1108                                        params = []string{strings.Join(channels, ",")}
     1109                                } else {
     1110                                        return
     1111                                }
     1112                        }
     1113                        pl.pendingCommands[uc.network.ID] = &irc.Message{
     1114                                Command: "LIST",
     1115                                Params:  params,
     1116                        }
     1117                        uc.trySendList(dc.id)
     1118                })
    10801119        case "NAMES":
    10811120                if len(msg.Params) == 0 {
  • trunk/upstream.go

    r176 r177  
    6060        saslClient  sasl.Client
    6161        saslStarted bool
     62
     63        // set of LIST commands in progress, per downstream
     64        // access is synchronized with user.pendingLISTsLock
     65        pendingLISTDownstreamSet map[uint64]struct{}
    6266}
    6367
     
    8084        outgoing := make(chan *irc.Message, 64)
    8185        uc := &upstreamConn{
    82                 network:               network,
    83                 logger:                logger,
    84                 net:                   netConn,
    85                 irc:                   irc.NewConn(netConn),
    86                 srv:                   network.user.srv,
    87                 user:                  network.user,
    88                 outgoing:              outgoing,
    89                 channels:              make(map[string]*upstreamChannel),
    90                 caps:                  make(map[string]string),
    91                 batches:               make(map[string]batch),
    92                 availableChannelTypes: stdChannelTypes,
    93                 availableChannelModes: stdChannelModes,
    94                 availableMemberships:  stdMemberships,
     86                network:                  network,
     87                logger:                   logger,
     88                net:                      netConn,
     89                irc:                      irc.NewConn(netConn),
     90                srv:                      network.user.srv,
     91                user:                     network.user,
     92                outgoing:                 outgoing,
     93                channels:                 make(map[string]*upstreamChannel),
     94                caps:                     make(map[string]string),
     95                batches:                  make(map[string]batch),
     96                availableChannelTypes:    stdChannelTypes,
     97                availableChannelModes:    stdChannelModes,
     98                availableMemberships:     stdMemberships,
     99                pendingLISTDownstreamSet: make(map[uint64]struct{}),
    95100        }
    96101
     
    137142        }
    138143        close(uc.closed)
     144
     145        uc.endPendingLists(true)
    139146        return nil
    140147}
     
    171178        }
    172179        return false
     180}
     181
     182func (uc *upstreamConn) getPendingList() *pendingLIST {
     183        uc.user.pendingLISTsLock.Lock()
     184        defer uc.user.pendingLISTsLock.Unlock()
     185        for _, pl := range uc.user.pendingLISTs {
     186                if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
     187                        continue
     188                }
     189                return &pl
     190        }
     191        return nil
     192}
     193
     194func (uc *upstreamConn) endPendingLists(all bool) (found bool) {
     195        found = false
     196        uc.user.pendingLISTsLock.Lock()
     197        defer uc.user.pendingLISTsLock.Unlock()
     198        for i := 0; i < len(uc.user.pendingLISTs); i++ {
     199                pl := uc.user.pendingLISTs[i]
     200                if _, ok := pl.pendingCommands[uc.network.ID]; !ok {
     201                        continue
     202                }
     203                delete(pl.pendingCommands, uc.network.ID)
     204                if len(pl.pendingCommands) == 0 {
     205                        uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...)
     206                        i--
     207                        uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
     208                                dc.SendMessage(&irc.Message{
     209                                        Prefix:  dc.srv.prefix(),
     210                                        Command: irc.RPL_LISTEND,
     211                                        Params:  []string{dc.nick, "End of /LIST"},
     212                                })
     213                        })
     214                }
     215                found = true
     216                if !all {
     217                        delete(uc.pendingLISTDownstreamSet, pl.downstreamID)
     218                        uc.user.forEachUpstream(func(uc *upstreamConn) {
     219                                uc.trySendList(pl.downstreamID)
     220                        })
     221                        return
     222                }
     223        }
     224        return
     225}
     226
     227func (uc *upstreamConn) trySendList(downstreamID uint64) {
     228        // must be called with a lock in uc.user.pendingLISTsLock
     229
     230        if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok {
     231                // a LIST command is already pending
     232                // we will try again when that command is completed
     233                return
     234        }
     235
     236        for _, pl := range uc.user.pendingLISTs {
     237                if pl.downstreamID != downstreamID {
     238                        continue
     239                }
     240                // this is the first pending LIST command list of the downstream
     241                listCommand, ok := pl.pendingCommands[uc.network.ID]
     242                if !ok {
     243                        // there is no command for this upstream in these LIST commands
     244                        // do not send anything
     245                        continue
     246                }
     247                // there is a command for this upstream in these LIST commands
     248                // send it now
     249
     250                uc.SendMessageLabeled(downstreamID, listCommand)
     251
     252                uc.pendingLISTDownstreamSet[downstreamID] = struct{}{}
     253                return
     254        }
    173255}
    174256
     
    834916                }
    835917                ch.TopicTime = time.Unix(sec, 0)
     918        case irc.RPL_LIST:
     919                var channel, clients, topic string
     920                if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil {
     921                        return err
     922                }
     923
     924                pl := uc.getPendingList()
     925                if pl == nil {
     926                        return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST")
     927                }
     928
     929                uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) {
     930                        dc.SendMessage(&irc.Message{
     931                                Prefix:  dc.srv.prefix(),
     932                                Command: irc.RPL_LIST,
     933                                Params:  []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic},
     934                        })
     935                })
     936        case irc.RPL_LISTEND:
     937                ok := uc.endPendingLists(false)
     938                if !ok {
     939                        return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST")
     940                }
    836941        case irc.RPL_NAMREPLY:
    837942                var name, statusStr, members string
     
    10911196                        })
    10921197                })
     1198        case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN:
     1199                var command, reason string
     1200                if err := parseMessageParams(msg, nil, &command, &reason); err != nil {
     1201                        return err
     1202                }
     1203
     1204                if command == "LIST" {
     1205                        ok := uc.endPendingLists(false)
     1206                        if !ok {
     1207                                return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command)
     1208                        }
     1209                        uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) {
     1210                                dc.SendMessage(&irc.Message{
     1211                                        Prefix:  uc.srv.prefix(),
     1212                                        Command: msg.Command,
     1213                                        Params:  []string{dc.nick, "LIST", reason},
     1214                                })
     1215                        })
     1216                }
    10931217        case "TAGMSG":
    10941218                // TODO: relay to downstream connections that accept message-tags
     
    11011225        case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
    11021226                // Ignore
     1227        case irc.RPL_LISTSTART:
     1228                // Ignore
    11031229        case rpl_localusers, rpl_globalusers:
    11041230                // Ignore
  • trunk/user.go

    r175 r177  
    9797        networks        []*network
    9898        downstreamConns []*downstreamConn
     99
     100        // LIST commands in progress
     101        pendingLISTsLock sync.Mutex
     102        pendingLISTs     []pendingLIST
     103}
     104
     105type pendingLIST struct {
     106        downstreamID uint64
     107        // list of per-upstream LIST commands not yet sent or completed
     108        pendingCommands map[int64]*irc.Message
    99109}
    100110
Note: See TracChangeset for help on using the changeset viewer.