Changeset 423 in code


Ignore:
Timestamp:
Oct 25, 2020, 4:47:38 PM (5 years ago)
Author:
contact
Message:

Add message store abstraction

Introduce a messageStore type, which will allow for multiple
implementations (e.g. in the DB or in-memory instead of on-disk).

The message store is per-user so that we don't need to deal with locking
and it's easier to implement per-user limits.

Location:
trunk
Files:
1 added
1 deleted
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r421 r423  
    864864                        }
    865865
    866                         lastID, err := lastMsgID(net, target, time.Now())
     866                        lastID, err := dc.user.msgStore.LastMsgID(net, target, time.Now())
    867867                        if err != nil {
    868868                                dc.logger.Printf("failed to get last message ID: %v", err)
     
    877877
    878878func (dc *downstreamConn) sendNetworkHistory(net *network) {
    879         if dc.caps["draft/chathistory"] || dc.srv.LogPath == "" {
     879        if dc.caps["draft/chathistory"] || dc.user.msgStore == nil {
    880880                return
    881881        }
     
    891891
    892892                limit := 4000
    893                 history, err := loadHistoryLatestID(net, target, lastDelivered, limit)
     893                history, err := dc.user.msgStore.LoadLatestID(net, target, lastDelivered, limit)
    894894                if err != nil {
    895895                        dc.logger.Printf("failed to send implicit history for %q: %v", target, err)
     
    16021602                }
    16031603
    1604                 if dc.srv.LogPath == "" {
     1604                if dc.user.msgStore == nil {
    16051605                        return ircError{&irc.Message{
    16061606                                Command: irc.ERR_UNKNOWNCOMMAND,
     
    16421642                switch subcommand {
    16431643                case "BEFORE":
    1644                         history, err = loadHistoryBeforeTime(uc.network, entity, timestamp, limit)
     1644                        history, err = dc.user.msgStore.LoadBeforeTime(uc.network, entity, timestamp, limit)
    16451645                case "AFTER":
    1646                         history, err = loadHistoryAfterTime(uc.network, entity, timestamp, limit)
     1646                        history, err = dc.user.msgStore.LoadAfterTime(uc.network, entity, timestamp, limit)
    16471647                default:
    16481648                        // TODO: support LATEST, BETWEEN
  • trunk/upstream.go

    r419 r423  
    8282        // set of LIST commands in progress, per downstream
    8383        pendingLISTDownstreamSet map[uint64]struct{}
    84 
    85         messageLoggers map[string]*messageLogger
    8684}
    8785
     
    183181                availableMemberships:     stdMemberships,
    184182                pendingLISTDownstreamSet: make(map[uint64]struct{}),
    185                 messageLoggers:           make(map[string]*messageLogger),
    186183        }
    187184        return uc, nil
     
    16121609
    16131610func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
    1614         if uc.srv.LogPath == "" {
     1611        if uc.user.msgStore == nil {
    16151612                return
    1616         }
    1617 
    1618         ml, ok := uc.messageLoggers[entity]
    1619         if !ok {
    1620                 ml = newMessageLogger(uc.network, entity)
    1621                 uc.messageLoggers[entity] = ml
    16221613        }
    16231614
     
    16291620        history, ok := uc.network.history[entity]
    16301621        if !ok {
    1631                 lastID, err := lastMsgID(uc.network, entity, time.Now())
     1622                lastID, err := uc.user.msgStore.LastMsgID(uc.network, entity, time.Now())
    16321623                if err != nil {
    16331624                        uc.logger.Printf("failed to log message: failed to get last message ID: %v", err)
     
    16531644        }
    16541645
    1655         msgID, err := ml.Append(msg)
     1646        msgID, err := uc.user.msgStore.Append(uc.network, entity, msg)
    16561647        if err != nil {
    16571648                uc.logger.Printf("failed to log message: %v", err)
  • trunk/user.go

    r421 r423  
    250250        networks        []*network
    251251        downstreamConns []*downstreamConn
     252        msgStore        *messageStore
    252253
    253254        // LIST commands in progress
     
    262263
    263264func newUser(srv *Server, record *User) *user {
     265        var msgStore *messageStore
     266        if srv.LogPath != "" {
     267                msgStore = newMessageStore(srv.LogPath, record.Username)
     268        }
     269
    264270        return &user{
    265                 User:   *record,
    266                 srv:    srv,
    267                 events: make(chan event, 64),
    268                 done:   make(chan struct{}),
     271                User:     *record,
     272                srv:      srv,
     273                events:   make(chan event, 64),
     274                done:     make(chan struct{}),
     275                msgStore: msgStore,
    269276        }
    270277}
     
    313320
    314321func (u *user) run() {
    315         defer close(u.done)
     322        defer func() {
     323                if u.msgStore != nil {
     324                        if err := u.msgStore.Close(); err != nil {
     325                                u.srv.Logger.Printf("failed to close message store for user %q: %v", u.Username, err)
     326                        }
     327                }
     328                close(u.done)
     329        }()
    316330
    317331        networks, err := u.srv.db.ListNetworks(u.ID)
     
    460474        uc.network.conn = nil
    461475
    462         for _, ml := range uc.messageLoggers {
    463                 if err := ml.Close(); err != nil {
    464                         uc.logger.Printf("failed to close message logger: %v", err)
    465                 }
    466         }
    467 
    468476        uc.endPendingLISTs(true)
    469477
Note: See TracChangeset for help on using the changeset viewer.