Changeset 781 in code
- Timestamp:
- Feb 11, 2022, 6:41:46 PM (3 years ago)
- Location:
- trunk
- Files:
-
- 1 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/bridge.go
r764 r781 2 2 3 3 import ( 4 "context" 5 "fmt" 4 6 "strconv" 5 7 "strings" … … 8 10 ) 9 11 10 func forwardChannel( dc *downstreamConn, ch *upstreamChannel) {12 func forwardChannel(ctx context.Context, dc *downstreamConn, ch *upstreamChannel) { 11 13 if !ch.complete { 12 14 panic("Tried to forward a partial channel") … … 17 19 sendTopic(dc, ch) 18 20 } 21 22 if dc.caps["soju.im/read"] { 23 channelCM := ch.conn.network.casemap(ch.Name) 24 r, err := dc.srv.db.GetReadReceipt(ctx, ch.conn.network.ID, channelCM) 25 if err != nil { 26 dc.logger.Printf("failed to get the read receipt for %q: %v", ch.Name, err) 27 } else { 28 timestampStr := "*" 29 if r != nil { 30 timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout)) 31 } 32 dc.SendMessage(&irc.Message{ 33 Prefix: dc.prefix(), 34 Command: "READ", 35 Params: []string{dc.marshalEntity(ch.conn.network, ch.Name), timestampStr}, 36 }) 37 } 38 } 39 19 40 sendNames(dc, ch) 20 41 } -
trunk/db.go
r712 r781 29 29 ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) 30 30 StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error 31 32 GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) 33 StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error 31 34 } 32 35 … … 181 184 InternalMsgID string 182 185 } 186 187 type ReadReceipt struct { 188 ID int64 189 Target string // channel or nick 190 Timestamp time.Time 191 } -
trunk/db_postgres.go
r774 r781 78 78 UNIQUE(network, target, client) 79 79 ); 80 81 CREATE TABLE "ReadReceipt" ( 82 id SERIAL PRIMARY KEY, 83 network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE, 84 target VARCHAR(255) NOT NULL, 85 timestamp TIMESTAMP WITH TIME ZONE NOT NULL, 86 UNIQUE(network, target) 87 ); 80 88 ` 81 89 … … 89 97 TYPE sasl_mechanism 90 98 USING sasl_mechanism::sasl_mechanism; 99 `, 100 ` 101 CREATE TABLE "ReadReceipt" ( 102 id SERIAL PRIMARY KEY, 103 network INTEGER NOT NULL REFERENCES "Network"(id) ON DELETE CASCADE, 104 target VARCHAR(255) NOT NULL, 105 timestamp TIMESTAMP WITH TIME ZONE NOT NULL, 106 UNIQUE(network, target) 107 ); 91 108 `, 92 109 } … … 501 518 return tx.Commit() 502 519 } 520 521 func (db *PostgresDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) { 522 ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout) 523 defer cancel() 524 525 receipt := &ReadReceipt{ 526 Target: name, 527 } 528 529 row := db.db.QueryRowContext(ctx, 530 `SELECT id, timestamp FROM "ReadReceipt" WHERE network = $1 AND target = $2`, 531 networkID, name) 532 if err := row.Scan(&receipt.ID, &receipt.Timestamp); err != nil { 533 if err == sql.ErrNoRows { 534 return nil, nil 535 } 536 return nil, err 537 } 538 return receipt, nil 539 } 540 541 func (db *PostgresDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error { 542 ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout) 543 defer cancel() 544 545 var err error 546 if receipt.ID != 0 { 547 _, err = db.db.ExecContext(ctx, ` 548 UPDATE "ReadReceipt" 549 SET timestamp = $1 550 WHERE id = $2`, 551 receipt.Timestamp, receipt.ID) 552 } else { 553 err = db.db.QueryRowContext(ctx, ` 554 INSERT INTO "ReadReceipt" (network, target, timestamp) 555 VALUES ($1, $2, $3) 556 RETURNING id`, 557 networkID, receipt.Target, receipt.Timestamp).Scan(&receipt.ID) 558 } 559 return err 560 } -
trunk/db_sqlite.go
r712 r781 70 70 FOREIGN KEY(network) REFERENCES Network(id), 71 71 UNIQUE(network, target, client) 72 ); 73 74 CREATE TABLE ReadReceipt ( 75 id INTEGER PRIMARY KEY, 76 network INTEGER NOT NULL, 77 target TEXT NOT NULL, 78 timestamp TEXT NOT NULL, 79 FOREIGN KEY(network) REFERENCES Network(id), 80 UNIQUE(network, target) 72 81 ); 73 82 ` … … 171 180 ALTER TABLE NetworkNew RENAME TO Network; 172 181 `, 182 ` 183 CREATE TABLE ReadReceipt ( 184 id INTEGER PRIMARY KEY, 185 network INTEGER NOT NULL, 186 target TEXT NOT NULL, 187 timestamp TEXT NOT NULL, 188 FOREIGN KEY(network) REFERENCES Network(id), 189 UNIQUE(network, target) 190 ); 191 `, 173 192 } 174 193 … … 378 397 FROM DeliveryReceipt 379 398 JOIN Network ON DeliveryReceipt.network = Network.id 399 WHERE Network.user = ? 400 )`, id) 401 if err != nil { 402 return err 403 } 404 405 _, err = tx.ExecContext(ctx, `DELETE FROM ReadReceipt 406 WHERE id IN ( 407 SELECT ReadReceipt.id 408 FROM ReadReceipt 409 JOIN Network ON ReadReceipt.network = Network.id 380 410 WHERE Network.user = ? 381 411 )`, id) … … 546 576 } 547 577 578 _, err = tx.ExecContext(ctx, "DELETE FROM ReadReceipt WHERE network = ?", id) 579 if err != nil { 580 return err 581 } 582 548 583 _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id) 549 584 if err != nil { … … 720 755 return tx.Commit() 721 756 } 757 758 func (db *SqliteDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) { 759 db.lock.RLock() 760 defer db.lock.RUnlock() 761 762 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout) 763 defer cancel() 764 765 receipt := &ReadReceipt{ 766 Target: name, 767 } 768 769 row := db.db.QueryRowContext(ctx, ` 770 SELECT id, timestamp FROM ReadReceipt WHERE network = :network AND target = :target`, 771 sql.Named("network", networkID), 772 sql.Named("target", name), 773 ) 774 var timestamp string 775 if err := row.Scan(&receipt.ID, ×tamp); err != nil { 776 if err == sql.ErrNoRows { 777 return nil, nil 778 } 779 return nil, err 780 } 781 if t, err := time.Parse(serverTimeLayout, timestamp); err != nil { 782 return nil, err 783 } else { 784 receipt.Timestamp = t 785 } 786 return receipt, nil 787 } 788 789 func (db *SqliteDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error { 790 db.lock.Lock() 791 defer db.lock.Unlock() 792 793 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout) 794 defer cancel() 795 796 args := []interface{}{ 797 sql.Named("id", receipt.ID), 798 sql.Named("timestamp", receipt.Timestamp.UTC().Format(serverTimeLayout)), 799 sql.Named("network", networkID), 800 sql.Named("target", receipt.Target), 801 } 802 803 var err error 804 if receipt.ID != 0 { 805 _, err = db.db.ExecContext(ctx, ` 806 UPDATE ReadReceipt SET timestamp = :timestamp WHERE id = :id`, 807 args...) 808 } else { 809 var res sql.Result 810 res, err = db.db.ExecContext(ctx, ` 811 INSERT INTO 812 ReadReceipt(network, target, timestamp) 813 VALUES (:network, :target, :timestamp)`, 814 args...) 815 if err != nil { 816 return err 817 } 818 receipt.ID, err = res.LastInsertId() 819 } 820 821 return err 822 } -
trunk/downstream.go
r780 r781 226 226 "soju.im/bouncer-networks": "", 227 227 "soju.im/bouncer-networks-notify": "", 228 "soju.im/read": "", 228 229 } 229 230 … … 554 555 return 555 556 } 557 if msg.Command == "READ" && !dc.caps["soju.im/read"] { 558 return 559 } 556 560 557 561 dc.srv.metrics.downstreamOutMessagesTotal.Inc() … … 1470 1474 }) 1471 1475 1472 forwardChannel( dc, ch)1476 forwardChannel(ctx, dc, ch) 1473 1477 } 1474 1478 }) … … 1818 1822 ch.Key = key 1819 1823 } 1820 uc.network.attach(c h)1824 uc.network.attach(ctx, ch) 1821 1825 } else { 1822 1826 ch = &Channel{ … … 2751 2755 } 2752 2756 }) 2757 case "READ": 2758 var target, criteria string 2759 if err := parseMessageParams(msg, &target); err != nil { 2760 return ircError{&irc.Message{ 2761 Command: "FAIL", 2762 Params: []string{"READ", "NEED_MORE_PARAMS", "Missing parameters"}, 2763 }} 2764 } 2765 if len(msg.Params) > 1 { 2766 criteria = msg.Params[1] 2767 } 2768 2769 uc, entity, err := dc.unmarshalEntity(target) 2770 if err != nil { 2771 return err 2772 } 2773 entityCM := uc.network.casemap(entity) 2774 2775 r, err := dc.srv.db.GetReadReceipt(ctx, uc.network.ID, entityCM) 2776 if err != nil { 2777 dc.logger.Printf("failed to get the read receipt for %q: %v", entity, err) 2778 return ircError{&irc.Message{ 2779 Command: "FAIL", 2780 Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"}, 2781 }} 2782 } else if r == nil { 2783 r = &ReadReceipt{ 2784 Target: entityCM, 2785 } 2786 } 2787 2788 broadcast := false 2789 if len(criteria) > 0 { 2790 // TODO: support msgid criteria 2791 criteriaParts := strings.SplitN(criteria, "=", 2) 2792 if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" { 2793 return ircError{&irc.Message{ 2794 Command: "FAIL", 2795 Params: []string{"READ", "INVALID_PARAMS", criteria, "Unknown criteria"}, 2796 }} 2797 } 2798 2799 timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1]) 2800 if err != nil { 2801 return ircError{&irc.Message{ 2802 Command: "FAIL", 2803 Params: []string{"READ", "INVALID_PARAMS", criteria, "Invalid criteria"}, 2804 }} 2805 } 2806 now := time.Now() 2807 if timestamp.After(now) { 2808 timestamp = now 2809 } 2810 if r.Timestamp.Before(timestamp) { 2811 r.Timestamp = timestamp 2812 if err := dc.srv.db.StoreReadReceipt(ctx, uc.network.ID, r); err != nil { 2813 dc.logger.Printf("failed to store receipt for %q: %v", entity, err) 2814 return ircError{&irc.Message{ 2815 Command: "FAIL", 2816 Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"}, 2817 }} 2818 } 2819 broadcast = true 2820 } 2821 } 2822 2823 timestampStr := "*" 2824 if !r.Timestamp.IsZero() { 2825 timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout)) 2826 } 2827 uc.forEachDownstream(func(d *downstreamConn) { 2828 if broadcast || dc.id == d.id { 2829 d.SendMessage(&irc.Message{ 2830 Prefix: d.prefix(), 2831 Command: "READ", 2832 Params: []string{d.marshalEntity(uc.network, entity), timestampStr}, 2833 }) 2834 } 2835 }) 2753 2836 case "BOUNCER": 2754 2837 var subcommand string -
trunk/upstream.go
r779 r781 1338 1338 if c == nil || !c.Detached { 1339 1339 uc.forEachDownstream(func(dc *downstreamConn) { 1340 forwardChannel( dc, ch)1340 forwardChannel(ctx, dc, ch) 1341 1341 }) 1342 1342 } … … 1769 1769 } 1770 1770 if ch.ReattachOn == FilterMessage || (ch.ReattachOn == FilterHighlight && uc.network.isHighlight(msg)) { 1771 uc.network.attach(c h)1771 uc.network.attach(ctx, ch) 1772 1772 if err := uc.srv.db.StoreChannel(ctx, uc.network.ID, ch); err != nil { 1773 1773 uc.logger.Printf("failed to update channel %q: %v", ch.Name, err) -
trunk/user.go
r777 r781 304 304 } 305 305 306 func (net *network) attach(c h *Channel) {306 func (net *network) attach(ctx context.Context, ch *Channel) { 307 307 if !ch.Detached { 308 308 return … … 330 330 331 331 if uch != nil { 332 forwardChannel( dc, uch)332 forwardChannel(ctx, dc, uch) 333 333 } 334 334 335 335 if detachedMsgID != "" { 336 dc.sendTargetBacklog(c ontext.TODO(), net, ch.Name, detachedMsgID)336 dc.sendTargetBacklog(ctx, net, ch.Name, detachedMsgID) 337 337 } 338 338 })
Note:
See TracChangeset
for help on using the changeset viewer.