Changeset 781 in code for trunk


Ignore:
Timestamp:
Feb 11, 2022, 6:41:46 PM (3 years ago)
Author:
delthas
Message:

Add support for the wip soju.im/read capability and READ command

READ lets downstream clients share information between each other about
what messages have been read by other downstreams.

Each target/entity has an optional corresponding read receipt, which is
stored as a timestamp.

  • When a downstream sends: READ #chan timestamp=2020-01-01T01:23:45.000Z the read receipt for that target is set to that date
  • soju sends READ to downstreams:
    • on JOIN, if the client uses the soju.im/read capability
    • when the read receipt timestamp is set by any downstream

The read receipt date is clamped by the previous receipt date and the
current time.

Location:
trunk
Files:
1 added
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/bridge.go

    r764 r781  
    22
    33import (
     4        "context"
     5        "fmt"
    46        "strconv"
    57        "strings"
     
    810)
    911
    10 func forwardChannel(dc *downstreamConn, ch *upstreamChannel) {
     12func forwardChannel(ctx context.Context, dc *downstreamConn, ch *upstreamChannel) {
    1113        if !ch.complete {
    1214                panic("Tried to forward a partial channel")
     
    1719                sendTopic(dc, ch)
    1820        }
     21
     22        if dc.caps["soju.im/read"] {
     23                channelCM := ch.conn.network.casemap(ch.Name)
     24                r, err := dc.srv.db.GetReadReceipt(ctx, ch.conn.network.ID, channelCM)
     25                if err != nil {
     26                        dc.logger.Printf("failed to get the read receipt for %q: %v", ch.Name, err)
     27                } else {
     28                        timestampStr := "*"
     29                        if r != nil {
     30                                timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout))
     31                        }
     32                        dc.SendMessage(&irc.Message{
     33                                Prefix:  dc.prefix(),
     34                                Command: "READ",
     35                                Params:  []string{dc.marshalEntity(ch.conn.network, ch.Name), timestampStr},
     36                        })
     37                }
     38        }
     39
    1940        sendNames(dc, ch)
    2041}
  • trunk/db.go

    r712 r781  
    2929        ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error)
    3030        StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error
     31
     32        GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error)
     33        StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error
    3134}
    3235
     
    181184        InternalMsgID string
    182185}
     186
     187type ReadReceipt struct {
     188        ID        int64
     189        Target    string // channel or nick
     190        Timestamp time.Time
     191}
  • trunk/db_postgres.go

    r774 r781  
    7878        UNIQUE(network, target, client)
    7979);
     80
     81CREATE TABLE "ReadReceipt" (
     82        id SERIAL PRIMARY KEY,
     83        network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE,
     84        target VARCHAR(255) NOT NULL,
     85        timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
     86        UNIQUE(network, target)
     87);
    8088`
    8189
     
    8997                        TYPE sasl_mechanism
    9098                        USING sasl_mechanism::sasl_mechanism;
     99        `,
     100        `
     101                CREATE TABLE "ReadReceipt" (
     102                        id SERIAL PRIMARY KEY,
     103                        network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE,
     104                        target VARCHAR(255) NOT NULL,
     105                        timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
     106                        UNIQUE(network, target)
     107                );
    91108        `,
    92109}
     
    501518        return tx.Commit()
    502519}
     520
     521func (db *PostgresDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) {
     522        ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
     523        defer cancel()
     524
     525        receipt := &ReadReceipt{
     526                Target: name,
     527        }
     528
     529        row := db.db.QueryRowContext(ctx,
     530                `SELECT id, timestamp FROM "ReadReceipt" WHERE network = $1 AND target = $2`,
     531                networkID, name)
     532        if err := row.Scan(&receipt.ID, &receipt.Timestamp); err != nil {
     533                if err == sql.ErrNoRows {
     534                        return nil, nil
     535                }
     536                return nil, err
     537        }
     538        return receipt, nil
     539}
     540
     541func (db *PostgresDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error {
     542        ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
     543        defer cancel()
     544
     545        var err error
     546        if receipt.ID != 0 {
     547                _, err = db.db.ExecContext(ctx, `
     548                        UPDATE "ReadReceipt"
     549                        SET timestamp = $1
     550                        WHERE id = $2`,
     551                        receipt.Timestamp, receipt.ID)
     552        } else {
     553                err = db.db.QueryRowContext(ctx, `
     554                        INSERT INTO "ReadReceipt" (network, target, timestamp)
     555                        VALUES ($1, $2, $3)
     556                        RETURNING id`,
     557                        networkID, receipt.Target, receipt.Timestamp).Scan(&receipt.ID)
     558        }
     559        return err
     560}
  • trunk/db_sqlite.go

    r712 r781  
    7070        FOREIGN KEY(network) REFERENCES Network(id),
    7171        UNIQUE(network, target, client)
     72);
     73
     74CREATE TABLE ReadReceipt (
     75        id INTEGER PRIMARY KEY,
     76        network INTEGER NOT NULL,
     77        target TEXT NOT NULL,
     78        timestamp TEXT NOT NULL,
     79        FOREIGN KEY(network) REFERENCES Network(id),
     80        UNIQUE(network, target)
    7281);
    7382`
     
    171180                ALTER TABLE NetworkNew RENAME TO Network;
    172181        `,
     182        `
     183                CREATE TABLE ReadReceipt (
     184                        id INTEGER PRIMARY KEY,
     185                        network INTEGER NOT NULL,
     186                        target TEXT NOT NULL,
     187                        timestamp TEXT NOT NULL,
     188                        FOREIGN KEY(network) REFERENCES Network(id),
     189                        UNIQUE(network, target)
     190                );
     191        `,
    173192}
    174193
     
    378397                        FROM DeliveryReceipt
    379398                        JOIN Network ON DeliveryReceipt.network = Network.id
     399                        WHERE Network.user = ?
     400                )`, id)
     401        if err != nil {
     402                return err
     403        }
     404
     405        _, err = tx.ExecContext(ctx, `DELETE FROM ReadReceipt
     406                WHERE id IN (
     407                        SELECT ReadReceipt.id
     408                        FROM ReadReceipt
     409                        JOIN Network ON ReadReceipt.network = Network.id
    380410                        WHERE Network.user = ?
    381411                )`, id)
     
    546576        }
    547577
     578        _, err = tx.ExecContext(ctx, "DELETE FROM ReadReceipt WHERE network = ?", id)
     579        if err != nil {
     580                return err
     581        }
     582
    548583        _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id)
    549584        if err != nil {
     
    720755        return tx.Commit()
    721756}
     757
     758func (db *SqliteDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) {
     759        db.lock.RLock()
     760        defer db.lock.RUnlock()
     761
     762        ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
     763        defer cancel()
     764
     765        receipt := &ReadReceipt{
     766                Target: name,
     767        }
     768
     769        row := db.db.QueryRowContext(ctx, `
     770                SELECT id, timestamp FROM ReadReceipt WHERE network = :network AND target = :target`,
     771                sql.Named("network", networkID),
     772                sql.Named("target", name),
     773        )
     774        var timestamp string
     775        if err := row.Scan(&receipt.ID, &timestamp); err != nil {
     776                if err == sql.ErrNoRows {
     777                        return nil, nil
     778                }
     779                return nil, err
     780        }
     781        if t, err := time.Parse(serverTimeLayout, timestamp); err != nil {
     782                return nil, err
     783        } else {
     784                receipt.Timestamp = t
     785        }
     786        return receipt, nil
     787}
     788
     789func (db *SqliteDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error {
     790        db.lock.Lock()
     791        defer db.lock.Unlock()
     792
     793        ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
     794        defer cancel()
     795
     796        args := []interface{}{
     797                sql.Named("id", receipt.ID),
     798                sql.Named("timestamp", receipt.Timestamp.UTC().Format(serverTimeLayout)),
     799                sql.Named("network", networkID),
     800                sql.Named("target", receipt.Target),
     801        }
     802
     803        var err error
     804        if receipt.ID != 0 {
     805                _, err = db.db.ExecContext(ctx, `
     806                        UPDATE ReadReceipt SET timestamp = :timestamp WHERE id = :id`,
     807                        args...)
     808        } else {
     809                var res sql.Result
     810                res, err = db.db.ExecContext(ctx, `
     811                        INSERT INTO
     812                        ReadReceipt(network, target, timestamp)
     813                        VALUES (:network, :target, :timestamp)`,
     814                        args...)
     815                if err != nil {
     816                        return err
     817                }
     818                receipt.ID, err = res.LastInsertId()
     819        }
     820
     821        return err
     822}
  • trunk/downstream.go

    r780 r781  
    226226        "soju.im/bouncer-networks":        "",
    227227        "soju.im/bouncer-networks-notify": "",
     228        "soju.im/read":                    "",
    228229}
    229230
     
    554555                return
    555556        }
     557        if msg.Command == "READ" && !dc.caps["soju.im/read"] {
     558                return
     559        }
    556560
    557561        dc.srv.metrics.downstreamOutMessagesTotal.Inc()
     
    14701474                        })
    14711475
    1472                         forwardChannel(dc, ch)
     1476                        forwardChannel(ctx, dc, ch)
    14731477                }
    14741478        })
     
    18181822                                        ch.Key = key
    18191823                                }
    1820                                 uc.network.attach(ch)
     1824                                uc.network.attach(ctx, ch)
    18211825                        } else {
    18221826                                ch = &Channel{
     
    27512755                        }
    27522756                })
     2757        case "READ":
     2758                var target, criteria string
     2759                if err := parseMessageParams(msg, &target); err != nil {
     2760                        return ircError{&irc.Message{
     2761                                Command: "FAIL",
     2762                                Params:  []string{"READ", "NEED_MORE_PARAMS", "Missing parameters"},
     2763                        }}
     2764                }
     2765                if len(msg.Params) > 1 {
     2766                        criteria = msg.Params[1]
     2767                }
     2768
     2769                uc, entity, err := dc.unmarshalEntity(target)
     2770                if err != nil {
     2771                        return err
     2772                }
     2773                entityCM := uc.network.casemap(entity)
     2774
     2775                r, err := dc.srv.db.GetReadReceipt(ctx, uc.network.ID, entityCM)
     2776                if err != nil {
     2777                        dc.logger.Printf("failed to get the read receipt for %q: %v", entity, err)
     2778                        return ircError{&irc.Message{
     2779                                Command: "FAIL",
     2780                                Params:  []string{"READ", "INTERNAL_ERROR", target, "Internal error"},
     2781                        }}
     2782                } else if r == nil {
     2783                        r = &ReadReceipt{
     2784                                Target: entityCM,
     2785                        }
     2786                }
     2787
     2788                broadcast := false
     2789                if len(criteria) > 0 {
     2790                        // TODO: support msgid criteria
     2791                        criteriaParts := strings.SplitN(criteria, "=", 2)
     2792                        if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" {
     2793                                return ircError{&irc.Message{
     2794                                        Command: "FAIL",
     2795                                        Params:  []string{"READ", "INVALID_PARAMS", criteria, "Unknown criteria"},
     2796                                }}
     2797                        }
     2798
     2799                        timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1])
     2800                        if err != nil {
     2801                                return ircError{&irc.Message{
     2802                                        Command: "FAIL",
     2803                                        Params:  []string{"READ", "INVALID_PARAMS", criteria, "Invalid criteria"},
     2804                                }}
     2805                        }
     2806                        now := time.Now()
     2807                        if timestamp.After(now) {
     2808                                timestamp = now
     2809                        }
     2810                        if r.Timestamp.Before(timestamp) {
     2811                                r.Timestamp = timestamp
     2812                                if err := dc.srv.db.StoreReadReceipt(ctx, uc.network.ID, r); err != nil {
     2813                                        dc.logger.Printf("failed to store receipt for %q: %v", entity, err)
     2814                                        return ircError{&irc.Message{
     2815                                                Command: "FAIL",
     2816                                                Params:  []string{"READ", "INTERNAL_ERROR", target, "Internal error"},
     2817                                        }}
     2818                                }
     2819                                broadcast = true
     2820                        }
     2821                }
     2822
     2823                timestampStr := "*"
     2824                if !r.Timestamp.IsZero() {
     2825                        timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout))
     2826                }
     2827                uc.forEachDownstream(func(d *downstreamConn) {
     2828                        if broadcast || dc.id == d.id {
     2829                                d.SendMessage(&irc.Message{
     2830                                        Prefix:  d.prefix(),
     2831                                        Command: "READ",
     2832                                        Params:  []string{d.marshalEntity(uc.network, entity), timestampStr},
     2833                                })
     2834                        }
     2835                })
    27532836        case "BOUNCER":
    27542837                var subcommand string
  • trunk/upstream.go

    r779 r781  
    13381338                if c == nil || !c.Detached {
    13391339                        uc.forEachDownstream(func(dc *downstreamConn) {
    1340                                 forwardChannel(dc, ch)
     1340                                forwardChannel(ctx, dc, ch)
    13411341                        })
    13421342                }
     
    17691769        }
    17701770        if ch.ReattachOn == FilterMessage || (ch.ReattachOn == FilterHighlight && uc.network.isHighlight(msg)) {
    1771                 uc.network.attach(ch)
     1771                uc.network.attach(ctx, ch)
    17721772                if err := uc.srv.db.StoreChannel(ctx, uc.network.ID, ch); err != nil {
    17731773                        uc.logger.Printf("failed to update channel %q: %v", ch.Name, err)
  • trunk/user.go

    r777 r781  
    304304}
    305305
    306 func (net *network) attach(ch *Channel) {
     306func (net *network) attach(ctx context.Context, ch *Channel) {
    307307        if !ch.Detached {
    308308                return
     
    330330
    331331                if uch != nil {
    332                         forwardChannel(dc, uch)
     332                        forwardChannel(ctx, dc, uch)
    333333                }
    334334
    335335                if detachedMsgID != "" {
    336                         dc.sendTargetBacklog(context.TODO(), net, ch.Name, detachedMsgID)
     336                        dc.sendTargetBacklog(ctx, net, ch.Name, detachedMsgID)
    337337                }
    338338        })
Note: See TracChangeset for help on using the changeset viewer.