source: code/trunk/db_sqlite.go@ 783

Last change on this file since 783 was 781, checked in by delthas, 3 years ago

Add support for the wip soju.im/read capability and READ command

READ lets downstream clients share information between each other about
what messages have been read by other downstreams.

Each target/entity has an optional corresponding read receipt, which is
stored as a timestamp.

  • When a downstream sends: READ #chan timestamp=2020-01-01T01:23:45.000Z the read receipt for that target is set to that date
  • soju sends READ to downstreams:
    • on JOIN, if the client uses the soju.im/read capability
    • when the read receipt timestamp is set by any downstream

The read receipt date is clamped by the previous receipt date and the
current time.

File size: 21.8 KB
RevLine 
[531]1package soju
2
3import (
[645]4 "context"
[531]5 "database/sql"
6 "fmt"
7 "math"
8 "strings"
9 "sync"
10 "time"
11
12 _ "github.com/mattn/go-sqlite3"
[712]13 "github.com/prometheus/client_golang/prometheus"
14 promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
[531]15)
16
[645]17const sqliteQueryTimeout = 5 * time.Second
18
[531]19const sqliteSchema = `
20CREATE TABLE User (
21 id INTEGER PRIMARY KEY,
[663]22 username TEXT NOT NULL UNIQUE,
23 password TEXT,
[568]24 admin INTEGER NOT NULL DEFAULT 0,
[663]25 realname TEXT
[531]26);
27
28CREATE TABLE Network (
29 id INTEGER PRIMARY KEY,
[663]30 name TEXT,
[531]31 user INTEGER NOT NULL,
[663]32 addr TEXT NOT NULL,
[664]33 nick TEXT,
[663]34 username TEXT,
35 realname TEXT,
36 pass TEXT,
37 connect_commands TEXT,
38 sasl_mechanism TEXT,
39 sasl_plain_username TEXT,
40 sasl_plain_password TEXT,
41 sasl_external_cert BLOB,
42 sasl_external_key BLOB,
[542]43 enabled INTEGER NOT NULL DEFAULT 1,
[531]44 FOREIGN KEY(user) REFERENCES User(id),
45 UNIQUE(user, addr, nick),
46 UNIQUE(user, name)
47);
48
49CREATE TABLE Channel (
50 id INTEGER PRIMARY KEY,
51 network INTEGER NOT NULL,
[663]52 name TEXT NOT NULL,
53 key TEXT,
[531]54 detached INTEGER NOT NULL DEFAULT 0,
[663]55 detached_internal_msgid TEXT,
[531]56 relay_detached INTEGER NOT NULL DEFAULT 0,
57 reattach_on INTEGER NOT NULL DEFAULT 0,
58 detach_after INTEGER NOT NULL DEFAULT 0,
59 detach_on INTEGER NOT NULL DEFAULT 0,
60 FOREIGN KEY(network) REFERENCES Network(id),
61 UNIQUE(network, name)
62);
63
64CREATE TABLE DeliveryReceipt (
65 id INTEGER PRIMARY KEY,
66 network INTEGER NOT NULL,
[663]67 target TEXT NOT NULL,
68 client TEXT,
69 internal_msgid TEXT NOT NULL,
[531]70 FOREIGN KEY(network) REFERENCES Network(id),
71 UNIQUE(network, target, client)
72);
[781]73
74CREATE TABLE ReadReceipt (
75 id INTEGER PRIMARY KEY,
76 network INTEGER NOT NULL,
77 target TEXT NOT NULL,
78 timestamp TEXT NOT NULL,
79 FOREIGN KEY(network) REFERENCES Network(id),
80 UNIQUE(network, target)
81);
[531]82`
83
84var sqliteMigrations = []string{
85 "", // migration #0 is reserved for schema initialization
86 "ALTER TABLE Network ADD COLUMN connect_commands VARCHAR(1023)",
87 "ALTER TABLE Channel ADD COLUMN detached INTEGER NOT NULL DEFAULT 0",
88 "ALTER TABLE Network ADD COLUMN sasl_external_cert BLOB DEFAULT NULL",
89 "ALTER TABLE Network ADD COLUMN sasl_external_key BLOB DEFAULT NULL",
90 "ALTER TABLE User ADD COLUMN admin INTEGER NOT NULL DEFAULT 0",
91 `
92 CREATE TABLE UserNew (
93 id INTEGER PRIMARY KEY,
94 username VARCHAR(255) NOT NULL UNIQUE,
95 password VARCHAR(255),
96 admin INTEGER NOT NULL DEFAULT 0
97 );
98 INSERT INTO UserNew SELECT rowid, username, password, admin FROM User;
99 DROP TABLE User;
100 ALTER TABLE UserNew RENAME TO User;
101 `,
102 `
103 CREATE TABLE NetworkNew (
104 id INTEGER PRIMARY KEY,
105 name VARCHAR(255),
106 user INTEGER NOT NULL,
107 addr VARCHAR(255) NOT NULL,
108 nick VARCHAR(255) NOT NULL,
109 username VARCHAR(255),
110 realname VARCHAR(255),
111 pass VARCHAR(255),
112 connect_commands VARCHAR(1023),
113 sasl_mechanism VARCHAR(255),
114 sasl_plain_username VARCHAR(255),
115 sasl_plain_password VARCHAR(255),
116 sasl_external_cert BLOB DEFAULT NULL,
117 sasl_external_key BLOB DEFAULT NULL,
118 FOREIGN KEY(user) REFERENCES User(id),
119 UNIQUE(user, addr, nick),
120 UNIQUE(user, name)
121 );
122 INSERT INTO NetworkNew
123 SELECT Network.id, name, User.id as user, addr, nick,
124 Network.username, realname, pass, connect_commands,
125 sasl_mechanism, sasl_plain_username, sasl_plain_password,
126 sasl_external_cert, sasl_external_key
127 FROM Network
128 JOIN User ON Network.user = User.username;
129 DROP TABLE Network;
130 ALTER TABLE NetworkNew RENAME TO Network;
131 `,
132 `
133 ALTER TABLE Channel ADD COLUMN relay_detached INTEGER NOT NULL DEFAULT 0;
134 ALTER TABLE Channel ADD COLUMN reattach_on INTEGER NOT NULL DEFAULT 0;
135 ALTER TABLE Channel ADD COLUMN detach_after INTEGER NOT NULL DEFAULT 0;
136 ALTER TABLE Channel ADD COLUMN detach_on INTEGER NOT NULL DEFAULT 0;
137 `,
138 `
139 CREATE TABLE DeliveryReceipt (
140 id INTEGER PRIMARY KEY,
141 network INTEGER NOT NULL,
142 target VARCHAR(255) NOT NULL,
143 client VARCHAR(255),
144 internal_msgid VARCHAR(255) NOT NULL,
145 FOREIGN KEY(network) REFERENCES Network(id),
146 UNIQUE(network, target, client)
147 );
148 `,
149 "ALTER TABLE Channel ADD COLUMN detached_internal_msgid VARCHAR(255)",
[542]150 "ALTER TABLE Network ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1",
[568]151 "ALTER TABLE User ADD COLUMN realname VARCHAR(255)",
[664]152 `
153 CREATE TABLE NetworkNew (
154 id INTEGER PRIMARY KEY,
155 name TEXT,
156 user INTEGER NOT NULL,
157 addr TEXT NOT NULL,
158 nick TEXT,
159 username TEXT,
160 realname TEXT,
161 pass TEXT,
162 connect_commands TEXT,
163 sasl_mechanism TEXT,
164 sasl_plain_username TEXT,
165 sasl_plain_password TEXT,
166 sasl_external_cert BLOB,
167 sasl_external_key BLOB,
168 enabled INTEGER NOT NULL DEFAULT 1,
169 FOREIGN KEY(user) REFERENCES User(id),
170 UNIQUE(user, addr, nick),
171 UNIQUE(user, name)
172 );
173 INSERT INTO NetworkNew
174 SELECT id, name, user, addr, nick, username, realname, pass,
175 connect_commands, sasl_mechanism, sasl_plain_username,
176 sasl_plain_password, sasl_external_cert, sasl_external_key,
177 enabled
178 FROM Network;
179 DROP TABLE Network;
180 ALTER TABLE NetworkNew RENAME TO Network;
181 `,
[781]182 `
183 CREATE TABLE ReadReceipt (
184 id INTEGER PRIMARY KEY,
185 network INTEGER NOT NULL,
186 target TEXT NOT NULL,
187 timestamp TEXT NOT NULL,
188 FOREIGN KEY(network) REFERENCES Network(id),
189 UNIQUE(network, target)
190 );
191 `,
[531]192}
193
194type SqliteDB struct {
195 lock sync.RWMutex
196 db *sql.DB
197}
198
[620]199func OpenSqliteDB(source string) (Database, error) {
200 sqlSqliteDB, err := sql.Open("sqlite3", source)
[531]201 if err != nil {
202 return nil, err
203 }
204
205 db := &SqliteDB{db: sqlSqliteDB}
206 if err := db.upgrade(); err != nil {
[588]207 sqlSqliteDB.Close()
[531]208 return nil, err
209 }
210
211 return db, nil
212}
213
214func (db *SqliteDB) Close() error {
215 db.lock.Lock()
216 defer db.lock.Unlock()
217 return db.db.Close()
218}
219
220func (db *SqliteDB) upgrade() error {
221 db.lock.Lock()
222 defer db.lock.Unlock()
223
224 var version int
225 if err := db.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
226 return fmt.Errorf("failed to query schema version: %v", err)
227 }
228
229 if version == len(sqliteMigrations) {
230 return nil
231 } else if version > len(sqliteMigrations) {
232 return fmt.Errorf("soju (version %d) older than schema (version %d)", len(sqliteMigrations), version)
233 }
234
235 tx, err := db.db.Begin()
236 if err != nil {
237 return err
238 }
239 defer tx.Rollback()
240
241 if version == 0 {
242 if _, err := tx.Exec(sqliteSchema); err != nil {
243 return fmt.Errorf("failed to initialize schema: %v", err)
244 }
245 } else {
246 for i := version; i < len(sqliteMigrations); i++ {
247 if _, err := tx.Exec(sqliteMigrations[i]); err != nil {
248 return fmt.Errorf("failed to execute migration #%v: %v", i, err)
249 }
250 }
251 }
252
253 // For some reason prepared statements don't work here
254 _, err = tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", len(sqliteMigrations)))
255 if err != nil {
256 return fmt.Errorf("failed to bump schema version: %v", err)
257 }
258
259 return tx.Commit()
260}
261
[712]262func (db *SqliteDB) MetricsCollector() prometheus.Collector {
263 return promcollectors.NewDBStatsCollector(db.db, "main")
264}
265
[652]266func (db *SqliteDB) Stats(ctx context.Context) (*DatabaseStats, error) {
[607]267 db.lock.RLock()
268 defer db.lock.RUnlock()
269
[652]270 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]271 defer cancel()
272
[607]273 var stats DatabaseStats
[645]274 row := db.db.QueryRowContext(ctx, `SELECT
[607]275 (SELECT COUNT(*) FROM User) AS users,
276 (SELECT COUNT(*) FROM Network) AS networks,
277 (SELECT COUNT(*) FROM Channel) AS channels`)
278 if err := row.Scan(&stats.Users, &stats.Networks, &stats.Channels); err != nil {
279 return nil, err
280 }
281
282 return &stats, nil
283}
284
[531]285func toNullString(s string) sql.NullString {
286 return sql.NullString{
287 String: s,
288 Valid: s != "",
289 }
290}
291
[652]292func (db *SqliteDB) ListUsers(ctx context.Context) ([]User, error) {
[531]293 db.lock.RLock()
294 defer db.lock.RUnlock()
295
[652]296 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]297 defer cancel()
298
299 rows, err := db.db.QueryContext(ctx,
300 "SELECT id, username, password, admin, realname FROM User")
[531]301 if err != nil {
302 return nil, err
303 }
304 defer rows.Close()
305
306 var users []User
307 for rows.Next() {
308 var user User
[598]309 var password, realname sql.NullString
310 if err := rows.Scan(&user.ID, &user.Username, &password, &user.Admin, &realname); err != nil {
[531]311 return nil, err
312 }
313 user.Password = password.String
[598]314 user.Realname = realname.String
[531]315 users = append(users, user)
316 }
317 if err := rows.Err(); err != nil {
318 return nil, err
319 }
320
321 return users, nil
322}
323
[652]324func (db *SqliteDB) GetUser(ctx context.Context, username string) (*User, error) {
[531]325 db.lock.RLock()
326 defer db.lock.RUnlock()
327
[652]328 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]329 defer cancel()
330
[531]331 user := &User{Username: username}
332
[568]333 var password, realname sql.NullString
[645]334 row := db.db.QueryRowContext(ctx,
335 "SELECT id, password, admin, realname FROM User WHERE username = ?",
336 username)
[568]337 if err := row.Scan(&user.ID, &password, &user.Admin, &realname); err != nil {
[531]338 return nil, err
339 }
340 user.Password = password.String
[568]341 user.Realname = realname.String
[531]342 return user, nil
343}
344
[652]345func (db *SqliteDB) StoreUser(ctx context.Context, user *User) error {
[531]346 db.lock.Lock()
347 defer db.lock.Unlock()
348
[652]349 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]350 defer cancel()
351
[596]352 args := []interface{}{
353 sql.Named("username", user.Username),
354 sql.Named("password", toNullString(user.Password)),
355 sql.Named("admin", user.Admin),
356 sql.Named("realname", toNullString(user.Realname)),
357 }
[531]358
359 var err error
360 if user.ID != 0 {
[645]361 _, err = db.db.ExecContext(ctx, `
362 UPDATE User SET password = :password, admin = :admin,
363 realname = :realname WHERE username = :username`,
364 args...)
[531]365 } else {
366 var res sql.Result
[645]367 res, err = db.db.ExecContext(ctx, `
368 INSERT INTO
369 User(username, password, admin, realname)
370 VALUES (:username, :password, :admin, :realname)`,
371 args...)
[531]372 if err != nil {
373 return err
374 }
375 user.ID, err = res.LastInsertId()
376 }
377
378 return err
379}
380
[652]381func (db *SqliteDB) DeleteUser(ctx context.Context, id int64) error {
[531]382 db.lock.Lock()
383 defer db.lock.Unlock()
384
[652]385 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]386 defer cancel()
387
[531]388 tx, err := db.db.Begin()
389 if err != nil {
390 return err
391 }
392 defer tx.Rollback()
393
[645]394 _, err = tx.ExecContext(ctx, `DELETE FROM DeliveryReceipt
[595]395 WHERE id IN (
396 SELECT DeliveryReceipt.id
397 FROM DeliveryReceipt
398 JOIN Network ON DeliveryReceipt.network = Network.id
399 WHERE Network.user = ?
400 )`, id)
401 if err != nil {
402 return err
403 }
404
[781]405 _, err = tx.ExecContext(ctx, `DELETE FROM ReadReceipt
406 WHERE id IN (
407 SELECT ReadReceipt.id
408 FROM ReadReceipt
409 JOIN Network ON ReadReceipt.network = Network.id
410 WHERE Network.user = ?
411 )`, id)
412 if err != nil {
413 return err
414 }
415
[645]416 _, err = tx.ExecContext(ctx, `DELETE FROM Channel
[531]417 WHERE id IN (
418 SELECT Channel.id
419 FROM Channel
420 JOIN Network ON Channel.network = Network.id
421 WHERE Network.user = ?
422 )`, id)
423 if err != nil {
424 return err
425 }
426
[645]427 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE user = ?", id)
[531]428 if err != nil {
429 return err
430 }
431
[645]432 _, err = tx.ExecContext(ctx, "DELETE FROM User WHERE id = ?", id)
[531]433 if err != nil {
434 return err
435 }
436
437 return tx.Commit()
438}
439
[652]440func (db *SqliteDB) ListNetworks(ctx context.Context, userID int64) ([]Network, error) {
[531]441 db.lock.RLock()
442 defer db.lock.RUnlock()
443
[652]444 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]445 defer cancel()
446
447 rows, err := db.db.QueryContext(ctx, `
448 SELECT id, name, addr, nick, username, realname, pass,
[531]449 connect_commands, sasl_mechanism, sasl_plain_username, sasl_plain_password,
[542]450 sasl_external_cert, sasl_external_key, enabled
[531]451 FROM Network
452 WHERE user = ?`,
453 userID)
454 if err != nil {
455 return nil, err
456 }
457 defer rows.Close()
458
459 var networks []Network
460 for rows.Next() {
461 var net Network
[664]462 var name, nick, username, realname, pass, connectCommands sql.NullString
[531]463 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
[664]464 err := rows.Scan(&net.ID, &name, &net.Addr, &nick, &username, &realname,
[531]465 &pass, &connectCommands, &saslMechanism, &saslPlainUsername, &saslPlainPassword,
[542]466 &net.SASL.External.CertBlob, &net.SASL.External.PrivKeyBlob, &net.Enabled)
[531]467 if err != nil {
468 return nil, err
469 }
470 net.Name = name.String
[664]471 net.Nick = nick.String
[531]472 net.Username = username.String
473 net.Realname = realname.String
474 net.Pass = pass.String
475 if connectCommands.Valid {
476 net.ConnectCommands = strings.Split(connectCommands.String, "\r\n")
477 }
478 net.SASL.Mechanism = saslMechanism.String
479 net.SASL.Plain.Username = saslPlainUsername.String
480 net.SASL.Plain.Password = saslPlainPassword.String
481 networks = append(networks, net)
482 }
483 if err := rows.Err(); err != nil {
484 return nil, err
485 }
486
487 return networks, nil
488}
489
[652]490func (db *SqliteDB) StoreNetwork(ctx context.Context, userID int64, network *Network) error {
[531]491 db.lock.Lock()
492 defer db.lock.Unlock()
493
[652]494 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]495 defer cancel()
496
[531]497 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
498 if network.SASL.Mechanism != "" {
499 saslMechanism = toNullString(network.SASL.Mechanism)
500 switch network.SASL.Mechanism {
501 case "PLAIN":
502 saslPlainUsername = toNullString(network.SASL.Plain.Username)
503 saslPlainPassword = toNullString(network.SASL.Plain.Password)
504 network.SASL.External.CertBlob = nil
505 network.SASL.External.PrivKeyBlob = nil
506 case "EXTERNAL":
507 // keep saslPlain* nil
508 default:
509 return fmt.Errorf("soju: cannot store network: unsupported SASL mechanism %q", network.SASL.Mechanism)
510 }
511 }
512
[596]513 args := []interface{}{
514 sql.Named("name", toNullString(network.Name)),
515 sql.Named("addr", network.Addr),
[664]516 sql.Named("nick", toNullString(network.Nick)),
[596]517 sql.Named("username", toNullString(network.Username)),
518 sql.Named("realname", toNullString(network.Realname)),
519 sql.Named("pass", toNullString(network.Pass)),
520 sql.Named("connect_commands", toNullString(strings.Join(network.ConnectCommands, "\r\n"))),
521 sql.Named("sasl_mechanism", saslMechanism),
522 sql.Named("sasl_plain_username", saslPlainUsername),
523 sql.Named("sasl_plain_password", saslPlainPassword),
524 sql.Named("sasl_external_cert", network.SASL.External.CertBlob),
525 sql.Named("sasl_external_key", network.SASL.External.PrivKeyBlob),
526 sql.Named("enabled", network.Enabled),
527
528 sql.Named("id", network.ID), // only for UPDATE
529 sql.Named("user", userID), // only for INSERT
530 }
531
[531]532 var err error
533 if network.ID != 0 {
[645]534 _, err = db.db.ExecContext(ctx, `
[596]535 UPDATE Network
536 SET name = :name, addr = :addr, nick = :nick, username = :username,
537 realname = :realname, pass = :pass, connect_commands = :connect_commands,
538 sasl_mechanism = :sasl_mechanism, sasl_plain_username = :sasl_plain_username, sasl_plain_password = :sasl_plain_password,
539 sasl_external_cert = :sasl_external_cert, sasl_external_key = :sasl_external_key,
540 enabled = :enabled
541 WHERE id = :id`, args...)
[531]542 } else {
543 var res sql.Result
[645]544 res, err = db.db.ExecContext(ctx, `
[596]545 INSERT INTO Network(user, name, addr, nick, username, realname, pass,
546 connect_commands, sasl_mechanism, sasl_plain_username,
[542]547 sasl_plain_password, sasl_external_cert, sasl_external_key, enabled)
[596]548 VALUES (:user, :name, :addr, :nick, :username, :realname, :pass,
549 :connect_commands, :sasl_mechanism, :sasl_plain_username,
550 :sasl_plain_password, :sasl_external_cert, :sasl_external_key, :enabled)`,
551 args...)
[531]552 if err != nil {
553 return err
554 }
555 network.ID, err = res.LastInsertId()
556 }
557 return err
558}
559
[652]560func (db *SqliteDB) DeleteNetwork(ctx context.Context, id int64) error {
[531]561 db.lock.Lock()
562 defer db.lock.Unlock()
563
[652]564 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]565 defer cancel()
566
[531]567 tx, err := db.db.Begin()
568 if err != nil {
569 return err
570 }
571 defer tx.Rollback()
572
[645]573 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ?", id)
[595]574 if err != nil {
575 return err
576 }
577
[781]578 _, err = tx.ExecContext(ctx, "DELETE FROM ReadReceipt WHERE network = ?", id)
579 if err != nil {
580 return err
581 }
582
[645]583 _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id)
[531]584 if err != nil {
585 return err
586 }
587
[645]588 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE id = ?", id)
[531]589 if err != nil {
590 return err
591 }
592
593 return tx.Commit()
594}
595
[652]596func (db *SqliteDB) ListChannels(ctx context.Context, networkID int64) ([]Channel, error) {
[531]597 db.lock.RLock()
598 defer db.lock.RUnlock()
599
[652]600 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]601 defer cancel()
602
603 rows, err := db.db.QueryContext(ctx, `SELECT
[531]604 id, name, key, detached, detached_internal_msgid,
605 relay_detached, reattach_on, detach_after, detach_on
606 FROM Channel
607 WHERE network = ?`, networkID)
608 if err != nil {
609 return nil, err
610 }
611 defer rows.Close()
612
613 var channels []Channel
614 for rows.Next() {
615 var ch Channel
616 var key, detachedInternalMsgID sql.NullString
617 var detachAfter int64
618 if err := rows.Scan(&ch.ID, &ch.Name, &key, &ch.Detached, &detachedInternalMsgID, &ch.RelayDetached, &ch.ReattachOn, &detachAfter, &ch.DetachOn); err != nil {
619 return nil, err
620 }
621 ch.Key = key.String
622 ch.DetachedInternalMsgID = detachedInternalMsgID.String
623 ch.DetachAfter = time.Duration(detachAfter) * time.Second
624 channels = append(channels, ch)
625 }
626 if err := rows.Err(); err != nil {
627 return nil, err
628 }
629
630 return channels, nil
631}
632
[652]633func (db *SqliteDB) StoreChannel(ctx context.Context, networkID int64, ch *Channel) error {
[531]634 db.lock.Lock()
635 defer db.lock.Unlock()
636
[652]637 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]638 defer cancel()
639
[596]640 args := []interface{}{
641 sql.Named("network", networkID),
642 sql.Named("name", ch.Name),
643 sql.Named("key", toNullString(ch.Key)),
644 sql.Named("detached", ch.Detached),
645 sql.Named("detached_internal_msgid", toNullString(ch.DetachedInternalMsgID)),
646 sql.Named("relay_detached", ch.RelayDetached),
647 sql.Named("reattach_on", ch.ReattachOn),
648 sql.Named("detach_after", int64(math.Ceil(ch.DetachAfter.Seconds()))),
649 sql.Named("detach_on", ch.DetachOn),
[531]650
[596]651 sql.Named("id", ch.ID), // only for UPDATE
652 }
653
[531]654 var err error
655 if ch.ID != 0 {
[645]656 _, err = db.db.ExecContext(ctx, `UPDATE Channel
[596]657 SET network = :network, name = :name, key = :key, detached = :detached,
658 detached_internal_msgid = :detached_internal_msgid, relay_detached = :relay_detached,
659 reattach_on = :reattach_on, detach_after = :detach_after, detach_on = :detach_on
660 WHERE id = :id`, args...)
[531]661 } else {
662 var res sql.Result
[645]663 res, err = db.db.ExecContext(ctx, `INSERT INTO Channel(network, name, key, detached, detached_internal_msgid, relay_detached, reattach_on, detach_after, detach_on)
[596]664 VALUES (:network, :name, :key, :detached, :detached_internal_msgid, :relay_detached, :reattach_on, :detach_after, :detach_on)`, args...)
[531]665 if err != nil {
666 return err
667 }
668 ch.ID, err = res.LastInsertId()
669 }
670 return err
671}
672
[652]673func (db *SqliteDB) DeleteChannel(ctx context.Context, id int64) error {
[531]674 db.lock.Lock()
675 defer db.lock.Unlock()
676
[652]677 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]678 defer cancel()
679
680 _, err := db.db.ExecContext(ctx, "DELETE FROM Channel WHERE id = ?", id)
[531]681 return err
682}
683
[652]684func (db *SqliteDB) ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) {
[531]685 db.lock.RLock()
686 defer db.lock.RUnlock()
687
[652]688 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]689 defer cancel()
690
691 rows, err := db.db.QueryContext(ctx, `
692 SELECT id, target, client, internal_msgid
[531]693 FROM DeliveryReceipt
694 WHERE network = ?`, networkID)
695 if err != nil {
696 return nil, err
697 }
698 defer rows.Close()
699
700 var receipts []DeliveryReceipt
701 for rows.Next() {
702 var rcpt DeliveryReceipt
703 var client sql.NullString
704 if err := rows.Scan(&rcpt.ID, &rcpt.Target, &client, &rcpt.InternalMsgID); err != nil {
705 return nil, err
706 }
707 rcpt.Client = client.String
708 receipts = append(receipts, rcpt)
709 }
710 if err := rows.Err(); err != nil {
711 return nil, err
712 }
713
714 return receipts, nil
715}
716
[652]717func (db *SqliteDB) StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error {
[531]718 db.lock.Lock()
719 defer db.lock.Unlock()
720
[652]721 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]722 defer cancel()
723
[531]724 tx, err := db.db.Begin()
725 if err != nil {
726 return err
727 }
728 defer tx.Rollback()
729
[645]730 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ? AND client IS ?",
[531]731 networkID, toNullString(client))
732 if err != nil {
733 return err
734 }
735
736 for i := range receipts {
737 rcpt := &receipts[i]
738
[645]739 res, err := tx.ExecContext(ctx, `
740 INSERT INTO DeliveryReceipt(network, target, client, internal_msgid)
[596]741 VALUES (:network, :target, :client, :internal_msgid)`,
742 sql.Named("network", networkID),
743 sql.Named("target", rcpt.Target),
744 sql.Named("client", toNullString(client)),
745 sql.Named("internal_msgid", rcpt.InternalMsgID))
[531]746 if err != nil {
747 return err
748 }
749 rcpt.ID, err = res.LastInsertId()
750 if err != nil {
751 return err
752 }
753 }
754
755 return tx.Commit()
756}
[781]757
758func (db *SqliteDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) {
759 db.lock.RLock()
760 defer db.lock.RUnlock()
761
762 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
763 defer cancel()
764
765 receipt := &ReadReceipt{
766 Target: name,
767 }
768
769 row := db.db.QueryRowContext(ctx, `
770 SELECT id, timestamp FROM ReadReceipt WHERE network = :network AND target = :target`,
771 sql.Named("network", networkID),
772 sql.Named("target", name),
773 )
774 var timestamp string
775 if err := row.Scan(&receipt.ID, &timestamp); err != nil {
776 if err == sql.ErrNoRows {
777 return nil, nil
778 }
779 return nil, err
780 }
781 if t, err := time.Parse(serverTimeLayout, timestamp); err != nil {
782 return nil, err
783 } else {
784 receipt.Timestamp = t
785 }
786 return receipt, nil
787}
788
789func (db *SqliteDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error {
790 db.lock.Lock()
791 defer db.lock.Unlock()
792
793 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
794 defer cancel()
795
796 args := []interface{}{
797 sql.Named("id", receipt.ID),
798 sql.Named("timestamp", receipt.Timestamp.UTC().Format(serverTimeLayout)),
799 sql.Named("network", networkID),
800 sql.Named("target", receipt.Target),
801 }
802
803 var err error
804 if receipt.ID != 0 {
805 _, err = db.db.ExecContext(ctx, `
806 UPDATE ReadReceipt SET timestamp = :timestamp WHERE id = :id`,
807 args...)
808 } else {
809 var res sql.Result
810 res, err = db.db.ExecContext(ctx, `
811 INSERT INTO
812 ReadReceipt(network, target, timestamp)
813 VALUES (:network, :target, :timestamp)`,
814 args...)
815 if err != nil {
816 return err
817 }
818 receipt.ID, err = res.LastInsertId()
819 }
820
821 return err
822}
Note: See TracBrowser for help on using the repository browser.