Changeset 253 in code for trunk


Ignore:
Timestamp:
Apr 10, 2020, 5:22:47 PM (5 years ago)
Author:
contact
Message:

Per-entity ring buffers

Instead of having one ring buffer per network, each network has one ring
buffer per entity (channel or nick). This allows history to be more
fair: if there's a lot of activity in a channel, it won't prune activity
in other channels.

We now track history sequence numbers per client and per network in
networkHistory. The overall list of offline clients is still tracked in
network.offlineClients.

When all clients have received history, the ring buffer can be released.

In the future, we should get rid of too-old offline clients to avoid
having to maintain history for them forever. We should also add a
per-user limit on the number of ring buffers.

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r249 r253  
    637637                return err
    638638        }
    639 
    640         // Only send history if we're the first connected client with that name and
    641         // network
    642         sendHistory := true
    643         dc.user.forEachDownstream(func(conn *downstreamConn) {
    644                 if dc.clientName == conn.clientName && dc.network == conn.network {
    645                         sendHistory = false
    646                 }
    647         })
    648639
    649640        dc.SendMessage(&irc.Message{
     
    689680
    690681        dc.forEachNetwork(func(net *network) {
    691                 seq, ok := net.history[dc.clientName]
    692                 if !sendHistory || !ok {
    693                         return
    694                 }
    695 
    696                 consumer := net.ring.NewConsumer(seq)
     682                // Only send history if we're the first connected client with that name
     683                // for the network
     684                if _, ok := net.offlineClients[dc.clientName]; ok {
     685                        dc.sendNetworkHistory(net)
     686                        delete(net.offlineClients, dc.clientName)
     687                }
     688        })
     689
     690        return nil
     691}
     692
     693func (dc *downstreamConn) sendNetworkHistory(net *network) {
     694        for target, history := range net.history {
     695                seq, ok := history.offlineClients[dc.clientName]
     696                if !ok {
     697                        continue
     698                }
     699                delete(history.offlineClients, dc.clientName)
     700
     701                // If all clients have received history, no need to keep the
     702                // ring buffer around
     703                if len(history.offlineClients) == 0 {
     704                        delete(net.history, target)
     705                }
     706
     707                consumer := history.ring.NewConsumer(seq)
    697708
    698709                // TODO: this means all history is lost when trying to send it while the
     
    726737                        dc.SendMessage(dc.marshalMessage(msg, uc))
    727738                }
    728         })
    729 
    730         return nil
     739        }
    731740}
    732741
  • trunk/upstream.go

    r248 r253  
    557557                                ch.Members[newNick] = membership
    558558                                uc.appendLog(ch.Name, msg)
     559                                uc.appendHistory(ch.Name, msg)
    559560                        }
    560561                }
    561562
    562563                if !me {
    563                         uc.network.ring.Produce(msg)
    564564                        uc.forEachDownstream(func(dc *downstreamConn) {
    565565                                dc.SendMessage(dc.marshalMessage(msg, uc))
     
    663663
    664664                                uc.appendLog(ch.Name, msg)
     665                                uc.appendHistory(ch.Name, msg)
    665666                        }
    666667                }
    667668
    668669                if msg.Prefix.Name != uc.nick {
    669                         uc.network.ring.Produce(msg)
    670670                        uc.forEachDownstream(func(dc *downstreamConn) {
    671671                                dc.SendMessage(dc.marshalMessage(msg, uc))
     
    12951295}
    12961296
     1297// appendHistory appends a message to the history. entity can be empty.
     1298func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
     1299        // If no client is offline, no need to append the message to the buffer
     1300        if len(uc.network.offlineClients) == 0 {
     1301                return
     1302        }
     1303
     1304        history, ok := uc.network.history[entity]
     1305        if !ok {
     1306                history = &networkHistory{
     1307                        offlineClients: make(map[string]uint64),
     1308                        ring:           NewRing(uc.srv.RingCap),
     1309                }
     1310                uc.network.history[entity] = history
     1311
     1312                for clientName, _ := range uc.network.offlineClients {
     1313                        history.offlineClients[clientName] = 0
     1314                }
     1315        }
     1316
     1317        history.ring.Produce(msg)
     1318}
     1319
    12971320// produce appends a message to the logs, adds it to the history and forwards
    12981321// it to connected downstream connections.
     
    13051328        }
    13061329
    1307         uc.network.ring.Produce(msg)
     1330        uc.appendHistory(target, msg)
    13081331
    13091332        uc.forEachDownstream(func(dc *downstreamConn) {
  • trunk/user.go

    r252 r253  
    4646}
    4747
     48type networkHistory struct {
     49        offlineClients map[string]uint64 // indexed by client name
     50        ring           *Ring             // can be nil if there are no offline clients
     51}
     52
    4853type network struct {
    4954        Network
    5055        user    *user
    51         ring    *Ring
    5256        stopped chan struct{}
    5357
    54         conn      *upstreamConn
    55         history   map[string]uint64
    56         lastError error
     58        conn           *upstreamConn
     59        history        map[string]*networkHistory // indexed by entity
     60        offlineClients map[string]struct{}        // indexed by client name
     61        lastError      error
    5762}
    5863
    5964func newNetwork(user *user, record *Network) *network {
    6065        return &network{
    61                 Network: *record,
    62                 user:    user,
    63                 ring:    NewRing(user.srv.RingCap),
    64                 stopped: make(chan struct{}),
    65                 history: make(map[string]uint64),
     66                Network:        *record,
     67                user:           user,
     68                stopped:        make(chan struct{}),
     69                history:        make(map[string]*networkHistory),
     70                offlineClients: make(map[string]struct{}),
    6671        }
    6772}
     
    295300                        dc := e.dc
    296301
    297                         dc.forEachNetwork(func(net *network) {
    298                                 seq := net.ring.Cur()
    299                                 net.history[dc.clientName] = seq
    300                         })
    301 
    302302                        for i := range u.downstreamConns {
    303303                                if u.downstreamConns[i] == dc {
     
    306306                                }
    307307                        }
     308
     309                        // Save history if we're the last client with this name
     310                        skipHistory := make(map[*network]bool)
     311                        u.forEachDownstream(func(conn *downstreamConn) {
     312                                if dc.clientName == conn.clientName {
     313                                        skipHistory[conn.network] = true
     314                                }
     315                        })
     316
     317                        dc.forEachNetwork(func(net *network) {
     318                                if skipHistory[net] || skipHistory[nil] {
     319                                        return
     320                                }
     321
     322                                net.offlineClients[dc.clientName] = struct{}{}
     323                                for _, history := range net.history {
     324                                        history.offlineClients[dc.clientName] = history.ring.Cur()
     325                                }
     326                        })
    308327
    309328                        u.forEachUpstream(func(uc *upstreamConn) {
Note: See TracChangeset for help on using the changeset viewer.