Changeset 253 in code for trunk/user.go


Ignore:
Timestamp:
Apr 10, 2020, 5:22:47 PM (5 years ago)
Author:
contact
Message:

Per-entity ring buffers

Instead of having one ring buffer per network, each network has one ring
buffer per entity (channel or nick). This allows history to be more
fair: if there's a lot of activity in a channel, it won't prune activity
in other channels.

We now track history sequence numbers per client and per network in
networkHistory. The overall list of offline clients is still tracked in
network.offlineClients.

When all clients have received history, the ring buffer can be released.

In the future, we should get rid of too-old offline clients to avoid
having to maintain history for them forever. We should also add a
per-user limit on the number of ring buffers.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/user.go

    r252 r253  
    4646}
    4747
     48type networkHistory struct {
     49        offlineClients map[string]uint64 // indexed by client name
     50        ring           *Ring             // can be nil if there are no offline clients
     51}
     52
    4853type network struct {
    4954        Network
    5055        user    *user
    51         ring    *Ring
    5256        stopped chan struct{}
    5357
    54         conn      *upstreamConn
    55         history   map[string]uint64
    56         lastError error
     58        conn           *upstreamConn
     59        history        map[string]*networkHistory // indexed by entity
     60        offlineClients map[string]struct{}        // indexed by client name
     61        lastError      error
    5762}
    5863
    5964func newNetwork(user *user, record *Network) *network {
    6065        return &network{
    61                 Network: *record,
    62                 user:    user,
    63                 ring:    NewRing(user.srv.RingCap),
    64                 stopped: make(chan struct{}),
    65                 history: make(map[string]uint64),
     66                Network:        *record,
     67                user:           user,
     68                stopped:        make(chan struct{}),
     69                history:        make(map[string]*networkHistory),
     70                offlineClients: make(map[string]struct{}),
    6671        }
    6772}
     
    295300                        dc := e.dc
    296301
    297                         dc.forEachNetwork(func(net *network) {
    298                                 seq := net.ring.Cur()
    299                                 net.history[dc.clientName] = seq
    300                         })
    301 
    302302                        for i := range u.downstreamConns {
    303303                                if u.downstreamConns[i] == dc {
     
    306306                                }
    307307                        }
     308
     309                        // Save history if we're the last client with this name
     310                        skipHistory := make(map[*network]bool)
     311                        u.forEachDownstream(func(conn *downstreamConn) {
     312                                if dc.clientName == conn.clientName {
     313                                        skipHistory[conn.network] = true
     314                                }
     315                        })
     316
     317                        dc.forEachNetwork(func(net *network) {
     318                                if skipHistory[net] || skipHistory[nil] {
     319                                        return
     320                                }
     321
     322                                net.offlineClients[dc.clientName] = struct{}{}
     323                                for _, history := range net.history {
     324                                        history.offlineClients[dc.clientName] = history.ring.Cur()
     325                                }
     326                        })
    308327
    309328                        u.forEachUpstream(func(uc *upstreamConn) {
Note: See TracChangeset for help on using the changeset viewer.