Changeset 409 in code for trunk


Ignore:
Timestamp:
Aug 20, 2020, 6:05:01 PM (5 years ago)
Author:
contact
Message:

Nuke in-memory ring buffer

Instead, always read chat history from logs. Unify the implicit chat
history (pushing history to clients) and explicit chat history
(via the CHATHISTORY command).

Instead of keeping track of ring buffer cursors for each client, use
message IDs.

If necessary, the ring buffer could be re-introduced behind a
common MessageStore interface (could be useful when on-disk logs are
disabled).

References: https://todo.sr.ht/~emersion/soju/80

Location:
trunk
Files:
1 deleted
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r406 r409  
    851851                        delete(net.offlineClients, dc.clientName)
    852852                }
     853
     854                // Fast-forward history to last message
     855                for target, history := range net.history {
     856                        if ch, ok := net.channels[target]; ok && ch.Detached {
     857                                continue
     858                        }
     859
     860                        lastID, err := lastMsgID(net, target, time.Now())
     861                        if err != nil {
     862                                dc.logger.Printf("failed to get last message ID: %v", err)
     863                                continue
     864                        }
     865                        history.clients[dc.clientName] = lastID
     866                }
    853867        })
    854868
     
    857871
    858872func (dc *downstreamConn) sendNetworkHistory(net *network) {
    859         if dc.caps["draft/chathistory"] {
     873        if dc.caps["draft/chathistory"] || dc.srv.LogPath == "" {
    860874                return
    861875        }
     
    865879                }
    866880
    867                 seq, ok := history.clients[dc.clientName]
     881                lastDelivered, ok := history.clients[dc.clientName]
    868882                if !ok {
    869883                        continue
    870884                }
    871                 history.clients[dc.clientName] = history.ring.Cur()
    872 
    873                 consumer := history.ring.NewConsumer(seq)
     885
     886                limit := 4000
     887                history, err := loadHistoryLatestID(dc.network, target, lastDelivered, limit)
     888                if err != nil {
     889                        dc.logger.Printf("failed to send implicit history for %q: %v", target, err)
     890                        continue
     891                }
    874892
    875893                batchRef := "history"
     
    882900                }
    883901
    884                 for {
    885                         msg := consumer.Consume()
    886                         if msg == nil {
    887                                 break
    888                         }
    889 
     902                for _, msg := range history {
    890903                        // Don't replay all messages, because that would mess up client
    891904                        // state. For instance we just sent the list of users, sending
     
    901914
    902915                        if dc.caps["batch"] {
    903                                 msg = msg.Copy()
    904916                                msg.Tags["batch"] = irc.TagValue(batchRef)
    905917                        }
    906 
    907918                        dc.SendMessage(dc.marshalMessage(msg, net))
    908919                }
  • trunk/server.go

    r398 r409  
    4848        Hostname       string
    4949        Logger         Logger
    50         RingCap        int
    5150        HistoryLimit   int
    5251        LogPath        string
     
    6564        return &Server{
    6665                Logger:       log.New(log.Writer(), "", log.LstdFlags),
    67                 RingCap:      4096,
    6866                HistoryLimit: 1000,
    6967                users:        make(map[string]*user),
  • trunk/upstream.go

    r407 r409  
    709709                                ch.Members[newNick] = memberships
    710710                                uc.appendLog(ch.Name, msg)
    711                                 uc.appendHistory(ch.Name, msg)
    712711                        }
    713712                }
     
    819818
    820819                                uc.appendLog(ch.Name, msg)
    821                                 uc.appendHistory(ch.Name, msg)
    822820                        }
    823821                }
     
    16121610}
    16131611
    1614 // TODO: handle moving logs when a network name changes, when support for this is added
    16151612func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
    16161613        if uc.srv.LogPath == "" {
     
    16241621        }
    16251622
    1626         if _, err := ml.Append(msg); err != nil {
    1627                 uc.logger.Printf("failed to log message: %v", err)
    1628         }
    1629 }
    1630 
    1631 // appendHistory appends a message to the history. entity can be empty.
    1632 func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
    16331623        detached := false
    16341624        if ch, ok := uc.network.channels[entity]; ok {
     
    16381628        history, ok := uc.network.history[entity]
    16391629        if !ok {
     1630                lastID, err := lastMsgID(uc.network, entity, time.Now())
     1631                if err != nil {
     1632                        uc.logger.Printf("failed to log message: failed to get last message ID: %v", err)
     1633                        return
     1634                }
     1635
    16401636                history = &networkHistory{
    1641                         clients: make(map[string]uint64),
    1642                         ring:    NewRing(uc.srv.RingCap),
     1637                        clients: make(map[string]string),
    16431638                }
    16441639                uc.network.history[entity] = history
    16451640
    16461641                for clientName, _ := range uc.network.offlineClients {
    1647                         history.clients[clientName] = 0
     1642                        history.clients[clientName] = lastID
    16481643                }
    16491644
     
    16521647                        // clients too
    16531648                        uc.forEachDownstream(func(dc *downstreamConn) {
    1654                                 history.clients[dc.clientName] = 0
    1655                         })
    1656                 }
    1657         }
    1658 
    1659         history.ring.Produce(msg)
     1649                                history.clients[dc.clientName] = lastID
     1650                        })
     1651                }
     1652        }
     1653
     1654        msgID, err := ml.Append(msg)
     1655        if err != nil {
     1656                uc.logger.Printf("failed to log message: %v", err)
     1657                return
     1658        }
    16601659
    16611660        if !detached {
    16621661                uc.forEachDownstream(func(dc *downstreamConn) {
    1663                         history.clients[dc.clientName] = history.ring.Cur()
    1664                 })
    1665         }
    1666 }
    1667 
    1668 // produce appends a message to the logs, adds it to the history and forwards
    1669 // it to connected downstream connections.
     1662                        history.clients[dc.clientName] = msgID
     1663                })
     1664        }
     1665}
     1666
     1667// produce appends a message to the logs and forwards it to connected downstream
     1668// connections.
    16701669//
    16711670// If origin is not nil and origin doesn't support echo-message, the message is
     
    16751674                uc.appendLog(target, msg)
    16761675        }
    1677 
    1678         uc.appendHistory(target, msg)
    16791676
    16801677        // Don't forward messages if it's a detached channel
  • trunk/user.go

    r406 r409  
    5252
    5353type networkHistory struct {
    54         clients map[string]uint64 // indexed by client name
    55         ring    *Ring             // can be nil if there are no offline clients
     54        clients map[string]string // indexed by client name
    5655}
    5756
Note: See TracChangeset for help on using the changeset viewer.