Changeset 398 in code for trunk


Ignore:
Timestamp:
Aug 19, 2020, 5:42:33 PM (5 years ago)
Author:
contact
Message:

Implement rate limiting for upstream messages

Allow up to 10 outgoing messages in a burst, then throttle to 1 message
each 2 seconds.

Closes: https://todo.sr.ht/~emersion/soju/87

Location:
trunk
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/conn.go

    r384 r398  
    107107}
    108108
     109type rateLimiter struct {
     110        C <-chan struct{}
     111        ticker *time.Ticker
     112        stopped chan struct{}
     113}
     114
     115func newRateLimiter(delay time.Duration, burst int) *rateLimiter {
     116        ch := make(chan struct{}, burst)
     117        for i := 0; i < burst; i++ {
     118                ch <- struct{}{}
     119        }
     120        ticker := time.NewTicker(delay)
     121        stopped := make(chan struct{})
     122        go func() {
     123                for {
     124                        select {
     125                        case <-ticker.C:
     126                                select {
     127                                case ch <- struct{}{}:
     128                                        // This space is intentionally left blank
     129                                case <-stopped:
     130                                        return
     131                                }
     132                        case <-stopped:
     133                                return
     134                        }
     135                }
     136        }()
     137        return &rateLimiter{
     138                C: ch,
     139                ticker: ticker,
     140                stopped: stopped,
     141        }
     142}
     143
     144func (rl *rateLimiter) Stop() {
     145        rl.ticker.Stop()
     146        close(rl.stopped)
     147}
     148
     149type connOptions struct {
     150        Logger Logger
     151        RateLimitDelay time.Duration
     152        RateLimitBurst int
     153}
     154
    109155type conn struct {
    110156        conn   ircConn
     
    117163}
    118164
    119 func newConn(srv *Server, ic ircConn, logger Logger) *conn {
     165func newConn(srv *Server, ic ircConn, options *connOptions) *conn {
    120166        outgoing := make(chan *irc.Message, 64)
    121167        c := &conn{
     
    123169                srv:      srv,
    124170                outgoing: outgoing,
    125                 logger:   logger,
     171                logger:   options.Logger,
    126172        }
    127173
    128174        go func() {
     175                var rl *rateLimiter
     176                if options.RateLimitDelay > 0 && options.RateLimitBurst > 0 {
     177                        rl = newRateLimiter(options.RateLimitDelay, options.RateLimitBurst)
     178                        defer rl.Stop()
     179                }
     180
    129181                for msg := range outgoing {
     182                        if rl != nil {
     183                                <-rl.C
     184                        }
     185
    130186                        if c.srv.Debug {
    131187                                c.logger.Printf("sent: %v", msg)
  • trunk/downstream.go

    r387 r398  
    103103        remoteAddr := ic.RemoteAddr().String()
    104104        logger := &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", remoteAddr)}
     105        options := connOptions{Logger: logger}
    105106        dc := &downstreamConn{
    106                 conn:          *newConn(srv, ic, logger),
     107                conn:          *newConn(srv, ic, &options),
    107108                id:            id,
    108109                supportedCaps: make(map[string]string),
  • trunk/server.go

    r385 r398  
    1717
    1818// TODO: make configurable
    19 var retryConnectMinDelay = time.Minute
     19var retryConnectDelay = time.Minute
    2020var connectTimeout = 15 * time.Second
    2121var writeTimeout = 10 * time.Second
     22var upstreamMessageDelay = 2 * time.Second
     23var upstreamMessageBurst = 10
    2224
    2325type Logger interface {
  • trunk/upstream.go

    r396 r398  
    158158        }
    159159
     160        options := connOptions{
     161                Logger: logger,
     162                RateLimitDelay: upstreamMessageDelay,
     163                RateLimitBurst: upstreamMessageBurst,
     164        }
     165
    160166        uc := &upstreamConn{
    161                 conn:                     *newConn(network.user.srv, newNetIRCConn(netConn), logger),
     167                conn:                     *newConn(network.user.srv, newNetIRCConn(netConn), &options),
    162168                network:                  network,
    163169                user:                     network.user,
  • trunk/user.go

    r395 r398  
    121121                }
    122122
    123                 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
    124                         delay := retryConnectMinDelay - dur
     123                if dur := time.Now().Sub(lastTry); dur < retryConnectDelay {
     124                        delay := retryConnectDelay - dur
    125125                        net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
    126126                        time.Sleep(delay)
Note: See TracChangeset for help on using the changeset viewer.