Changeset 489 in code for trunk


Ignore:
Timestamp:
Mar 31, 2021, 4:04:13 PM (4 years ago)
Author:
contact
Message:

Save delivery receipts in DB

This avoids loosing history on restart for clients that don't
support chathistory.

Closes: https://todo.sr.ht/~emersion/soju/80

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/db.go

    r457 r489  
    119119        DetachAfter   time.Duration
    120120        DetachOn      MessageFilter
     121}
     122
     123type DeliveryReceipt struct {
     124        ID            int64
     125        Target        string // channel or nick
     126        Client        string
     127        InternalMsgID string
    121128}
    122129
     
    161168        FOREIGN KEY(network) REFERENCES Network(id),
    162169        UNIQUE(network, name)
     170);
     171
     172CREATE TABLE DeliveryReceipt (
     173        id INTEGER PRIMARY KEY,
     174        network INTEGER NOT NULL,
     175        target VARCHAR(255) NOT NULL,
     176        client VARCHAR(255),
     177        internal_msgid VARCHAR(255) NOT NULL,
     178        FOREIGN KEY(network) REFERENCES Network(id),
     179        UNIQUE(network, target, client)
    163180);
    164181`
     
    218235                ALTER TABLE Channel ADD COLUMN detach_on INTEGER NOT NULL DEFAULT 0;
    219236        `,
     237        `
     238                CREATE TABLE DeliveryReceipt (
     239                        id INTEGER PRIMARY KEY,
     240                        network INTEGER NOT NULL,
     241                        target VARCHAR(255) NOT NULL,
     242                        client VARCHAR(255),
     243                        internal_msgid VARCHAR(255) NOT NULL,
     244                        FOREIGN KEY(network) REFERENCES Network(id),
     245                        UNIQUE(network, target, client)
     246                );
     247        `,
    220248}
    221249
     
    579607        return err
    580608}
     609
     610func (db *DB) ListDeliveryReceipts(networkID int64) ([]DeliveryReceipt, error) {
     611        db.lock.RLock()
     612        defer db.lock.RUnlock()
     613
     614        rows, err := db.db.Query(`SELECT id, target, client, internal_msgid
     615                FROM DeliveryReceipt
     616                WHERE network = ?`, networkID)
     617        if err != nil {
     618                return nil, err
     619        }
     620        defer rows.Close()
     621
     622        var receipts []DeliveryReceipt
     623        for rows.Next() {
     624                var rcpt DeliveryReceipt
     625                var client sql.NullString
     626                if err := rows.Scan(&rcpt.ID, &rcpt.Target, &client, &rcpt.InternalMsgID); err != nil {
     627                        return nil, err
     628                }
     629                rcpt.Client = client.String
     630                receipts = append(receipts, rcpt)
     631        }
     632        if err := rows.Err(); err != nil {
     633                return nil, err
     634        }
     635
     636        return receipts, nil
     637}
     638
     639func (db *DB) StoreClientDeliveryReceipts(networkID int64, client string, receipts []DeliveryReceipt) error {
     640        db.lock.Lock()
     641        defer db.lock.Unlock()
     642
     643        tx, err := db.db.Begin()
     644        if err != nil {
     645                return err
     646        }
     647        defer tx.Rollback()
     648
     649        _, err = tx.Exec("DELETE FROM DeliveryReceipt WHERE network = ? AND client = ?",
     650                networkID, toNullString(client))
     651        if err != nil {
     652                return err
     653        }
     654
     655        for i := range receipts {
     656                rcpt := &receipts[i]
     657
     658                res, err := tx.Exec("INSERT INTO DeliveryReceipt(network, target, client, internal_msgid) VALUES (?, ?, ?, ?)",
     659                        networkID, rcpt.Target, toNullString(client), rcpt.InternalMsgID)
     660                if err != nil {
     661                        return err
     662                }
     663                rcpt.ID, err = res.LastInsertId()
     664                if err != nil {
     665                        return err
     666                }
     667        }
     668
     669        return tx.Commit()
     670}
  • trunk/upstream.go

    r487 r489  
    17531753                }
    17541754
    1755                 for clientName, _ := range uc.user.clientNames {
     1755                uc.network.delivered.ForEachClient(func(clientName string) {
    17561756                        uc.network.delivered.StoreID(entity, clientName, lastID)
    1757                 }
     1757                })
    17581758        }
    17591759
  • trunk/user.go

    r487 r489  
    9090        for _, entry := range ds.m.innerMap {
    9191                f(entry.originalKey)
     92        }
     93}
     94
     95func (ds deliveredStore) ForEachClient(f func(clientName string)) {
     96        clients := make(map[string]struct{})
     97        for _, entry := range ds.m.innerMap {
     98                delivered := entry.value.(deliveredClientMap)
     99                for clientName := range delivered {
     100                        clients[clientName] = struct{}{}
     101                }
     102        }
     103
     104        for clientName := range clients {
     105                f(clientName)
    92106        }
    93107}
     
    299313}
    300314
     315func (net *network) storeClientDeliveryReceipts(clientName string) {
     316        if !net.user.hasPersistentMsgStore() {
     317                return
     318        }
     319
     320        var receipts []DeliveryReceipt
     321        net.delivered.ForEachTarget(func(target string) {
     322                msgID := net.delivered.LoadID(target, clientName)
     323                if msgID == "" {
     324                        return
     325                }
     326                receipts = append(receipts, DeliveryReceipt{
     327                        Target:        target,
     328                        InternalMsgID: msgID,
     329                })
     330        })
     331
     332        if err := net.user.srv.db.StoreClientDeliveryReceipts(net.ID, clientName, receipts); err != nil {
     333                net.user.srv.Logger.Printf("failed to store delivery receipts for user %q, client %q, network %q: %v", net.user.Username, clientName, net.GetName(), err)
     334        }
     335}
     336
    301337type user struct {
    302338        User
     
    309345        downstreamConns []*downstreamConn
    310346        msgStore        messageStore
    311         clientNames     map[string]struct{}
    312347
    313348        // LIST commands in progress
     
    330365
    331366        return &user{
    332                 User:        *record,
    333                 srv:         srv,
    334                 events:      make(chan event, 64),
    335                 done:        make(chan struct{}),
    336                 msgStore:    msgStore,
    337                 clientNames: make(map[string]struct{}),
     367                User:     *record,
     368                srv:      srv,
     369                events:   make(chan event, 64),
     370                done:     make(chan struct{}),
     371                msgStore: msgStore,
    338372        }
    339373}
     
    407441                network := newNetwork(u, &record, channels)
    408442                u.networks = append(u.networks, network)
     443
     444                if u.hasPersistentMsgStore() {
     445                        receipts, err := u.srv.db.ListDeliveryReceipts(record.ID)
     446                        if err != nil {
     447                                u.srv.Logger.Printf("failed to load delivery receipts for user %q, network %q: %v", u.Username, network.GetName(), err)
     448                                return
     449                        }
     450
     451                        for _, rcpt := range receipts {
     452                                network.delivered.StoreID(rcpt.Target, rcpt.Client, rcpt.InternalMsgID)
     453                        }
     454                }
    409455
    410456                go network.run()
     
    490536                                uc.updateAway()
    491537                        })
    492 
    493                         u.clientNames[dc.clientName] = struct{}{}
    494538                case eventDownstreamDisconnected:
    495539                        dc := e.dc
     
    501545                                }
    502546                        }
     547
     548                        dc.forEachNetwork(func(net *network) {
     549                                net.storeClientDeliveryReceipts(dc.clientName)
     550                        })
    503551
    504552                        u.forEachUpstream(func(uc *upstreamConn) {
     
    525573                        for _, n := range u.networks {
    526574                                n.stop()
     575
     576                                n.delivered.ForEachClient(func(clientName string) {
     577                                        n.storeClientDeliveryReceipts(clientName)
     578                                })
    527579                        }
    528580                        return
     
    666718        <-u.done
    667719}
     720
     721func (u *user) hasPersistentMsgStore() bool {
     722        if u.msgStore == nil {
     723                return false
     724        }
     725        _, isMem := u.msgStore.(*memoryMessageStore)
     726        return !isMem
     727}
Note: See TracChangeset for help on using the changeset viewer.