source: code/trunk/db_sqlite.go@ 807

Last change on this file since 807 was 804, checked in by koizumi.aoi, 2 years ago

Drunk as I like

Signed-off-by: Aoi K <koizumi.aoi@…>

File size: 21.7 KB
Line 
1package suika
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "math"
8 "strings"
9 "sync"
10 "time"
11
12 _ "github.com/mattn/go-sqlite3"
13 "github.com/prometheus/client_golang/prometheus"
14 promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
15)
16
17const sqliteQueryTimeout = 5 * time.Second
18
19const sqliteSchema = `
20CREATE TABLE User (
21 id INTEGER PRIMARY KEY,
22 username TEXT NOT NULL UNIQUE,
23 password TEXT,
24 admin INTEGER NOT NULL DEFAULT 0,
25 realname TEXT
26);
27
28CREATE TABLE Network (
29 id INTEGER PRIMARY KEY,
30 name TEXT,
31 user INTEGER NOT NULL,
32 addr TEXT NOT NULL,
33 nick TEXT,
34 username TEXT,
35 realname TEXT,
36 pass TEXT,
37 connect_commands TEXT,
38 sasl_mechanism TEXT,
39 sasl_plain_username TEXT,
40 sasl_plain_password TEXT,
41 sasl_external_cert BLOB,
42 sasl_external_key BLOB,
43 enabled INTEGER NOT NULL DEFAULT 1,
44 FOREIGN KEY(user) REFERENCES User(id),
45 UNIQUE(user, addr, nick),
46 UNIQUE(user, name)
47);
48
49CREATE TABLE Channel (
50 id INTEGER PRIMARY KEY,
51 network INTEGER NOT NULL,
52 name TEXT NOT NULL,
53 key TEXT,
54 detached INTEGER NOT NULL DEFAULT 0,
55 detached_internal_msgid TEXT,
56 relay_detached INTEGER NOT NULL DEFAULT 0,
57 reattach_on INTEGER NOT NULL DEFAULT 0,
58 detach_after INTEGER NOT NULL DEFAULT 0,
59 detach_on INTEGER NOT NULL DEFAULT 0,
60 FOREIGN KEY(network) REFERENCES Network(id),
61 UNIQUE(network, name)
62);
63
64CREATE TABLE DeliveryReceipt (
65 id INTEGER PRIMARY KEY,
66 network INTEGER NOT NULL,
67 target TEXT NOT NULL,
68 client TEXT,
69 internal_msgid TEXT NOT NULL,
70 FOREIGN KEY(network) REFERENCES Network(id),
71 UNIQUE(network, target, client)
72);
73
74CREATE TABLE ReadReceipt (
75 id INTEGER PRIMARY KEY,
76 network INTEGER NOT NULL,
77 target TEXT NOT NULL,
78 timestamp TEXT NOT NULL,
79 FOREIGN KEY(network) REFERENCES Network(id),
80 UNIQUE(network, target)
81);
82`
83
84var sqliteMigrations = []string{
85 "", // migration #0 is reserved for schema initialization
86 "ALTER TABLE Network ADD COLUMN connect_commands VARCHAR(1023)",
87 "ALTER TABLE Channel ADD COLUMN detached INTEGER NOT NULL DEFAULT 0",
88 "ALTER TABLE Network ADD COLUMN sasl_external_cert BLOB DEFAULT NULL",
89 "ALTER TABLE Network ADD COLUMN sasl_external_key BLOB DEFAULT NULL",
90 "ALTER TABLE User ADD COLUMN admin INTEGER NOT NULL DEFAULT 0",
91 `
92 CREATE TABLE UserNew (
93 id INTEGER PRIMARY KEY,
94 username VARCHAR(255) NOT NULL UNIQUE,
95 password VARCHAR(255),
96 admin INTEGER NOT NULL DEFAULT 0
97 );
98 INSERT INTO UserNew SELECT rowid, username, password, admin FROM User;
99 DROP TABLE User;
100 ALTER TABLE UserNew RENAME TO User;
101 `,
102 `
103 CREATE TABLE NetworkNew (
104 id INTEGER PRIMARY KEY,
105 name VARCHAR(255),
106 user INTEGER NOT NULL,
107 addr VARCHAR(255) NOT NULL,
108 nick VARCHAR(255) NOT NULL,
109 username VARCHAR(255),
110 realname VARCHAR(255),
111 pass VARCHAR(255),
112 connect_commands VARCHAR(1023),
113 sasl_mechanism VARCHAR(255),
114 sasl_plain_username VARCHAR(255),
115 sasl_plain_password VARCHAR(255),
116 sasl_external_cert BLOB DEFAULT NULL,
117 sasl_external_key BLOB DEFAULT NULL,
118 FOREIGN KEY(user) REFERENCES User(id),
119 UNIQUE(user, addr, nick),
120 UNIQUE(user, name)
121 );
122 INSERT INTO NetworkNew
123 SELECT Network.id, name, User.id as user, addr, nick,
124 Network.username, realname, pass, connect_commands,
125 sasl_mechanism, sasl_plain_username, sasl_plain_password,
126 sasl_external_cert, sasl_external_key
127 FROM Network
128 JOIN User ON Network.user = User.username;
129 DROP TABLE Network;
130 ALTER TABLE NetworkNew RENAME TO Network;
131 `,
132 `
133 ALTER TABLE Channel ADD COLUMN relay_detached INTEGER NOT NULL DEFAULT 0;
134 ALTER TABLE Channel ADD COLUMN reattach_on INTEGER NOT NULL DEFAULT 0;
135 ALTER TABLE Channel ADD COLUMN detach_after INTEGER NOT NULL DEFAULT 0;
136 ALTER TABLE Channel ADD COLUMN detach_on INTEGER NOT NULL DEFAULT 0;
137 `,
138 `
139 CREATE TABLE DeliveryReceipt (
140 id INTEGER PRIMARY KEY,
141 network INTEGER NOT NULL,
142 target VARCHAR(255) NOT NULL,
143 client VARCHAR(255),
144 internal_msgid VARCHAR(255) NOT NULL,
145 FOREIGN KEY(network) REFERENCES Network(id),
146 UNIQUE(network, target, client)
147 );
148 `,
149 "ALTER TABLE Channel ADD COLUMN detached_internal_msgid VARCHAR(255)",
150 "ALTER TABLE Network ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1",
151 "ALTER TABLE User ADD COLUMN realname VARCHAR(255)",
152 `
153 CREATE TABLE NetworkNew (
154 id INTEGER PRIMARY KEY,
155 name TEXT,
156 user INTEGER NOT NULL,
157 addr TEXT NOT NULL,
158 nick TEXT,
159 username TEXT,
160 realname TEXT,
161 pass TEXT,
162 connect_commands TEXT,
163 sasl_mechanism TEXT,
164 sasl_plain_username TEXT,
165 sasl_plain_password TEXT,
166 sasl_external_cert BLOB,
167 sasl_external_key BLOB,
168 enabled INTEGER NOT NULL DEFAULT 1,
169 FOREIGN KEY(user) REFERENCES User(id),
170 UNIQUE(user, addr, nick),
171 UNIQUE(user, name)
172 );
173 INSERT INTO NetworkNew
174 SELECT id, name, user, addr, nick, username, realname, pass,
175 connect_commands, sasl_mechanism, sasl_plain_username,
176 sasl_plain_password, sasl_external_cert, sasl_external_key,
177 enabled
178 FROM Network;
179 DROP TABLE Network;
180 ALTER TABLE NetworkNew RENAME TO Network;
181 `,
182 `
183 CREATE TABLE ReadReceipt (
184 id INTEGER PRIMARY KEY,
185 network INTEGER NOT NULL,
186 target TEXT NOT NULL,
187 timestamp TEXT NOT NULL,
188 FOREIGN KEY(network) REFERENCES Network(id),
189 UNIQUE(network, target)
190 );
191 `,
192}
193
194type SqliteDB struct {
195 lock sync.RWMutex
196 db *sql.DB
197}
198
199func OpenSqliteDB(source string) (Database, error) {
200 sqlSqliteDB, err := sql.Open("sqlite3", source)
201 if err != nil {
202 return nil, err
203 }
204
205 db := &SqliteDB{db: sqlSqliteDB}
206 if err := db.upgrade(); err != nil {
207 sqlSqliteDB.Close()
208 return nil, err
209 }
210
211 return db, nil
212}
213
214func (db *SqliteDB) Close() error {
215 db.lock.Lock()
216 defer db.lock.Unlock()
217 return db.db.Close()
218}
219
220func (db *SqliteDB) upgrade() error {
221 db.lock.Lock()
222 defer db.lock.Unlock()
223
224 var version int
225 if err := db.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
226 return fmt.Errorf("failed to query schema version: %v", err)
227 }
228
229 if version == len(sqliteMigrations) {
230 return nil
231 } else if version > len(sqliteMigrations) {
232 return fmt.Errorf("suika (version %d) older than schema (version %d)", len(sqliteMigrations), version)
233 }
234
235 tx, err := db.db.Begin()
236 if err != nil {
237 return err
238 }
239 defer tx.Rollback()
240
241 if version == 0 {
242 if _, err := tx.Exec(sqliteSchema); err != nil {
243 return fmt.Errorf("failed to initialize schema: %v", err)
244 }
245 } else {
246 for i := version; i < len(sqliteMigrations); i++ {
247 if _, err := tx.Exec(sqliteMigrations[i]); err != nil {
248 return fmt.Errorf("failed to execute migration #%v: %v", i, err)
249 }
250 }
251 }
252
253 // For some reason prepared statements don't work here
254 _, err = tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", len(sqliteMigrations)))
255 if err != nil {
256 return fmt.Errorf("failed to bump schema version: %v", err)
257 }
258
259 return tx.Commit()
260}
261
262func (db *SqliteDB) MetricsCollector() prometheus.Collector {
263 return promcollectors.NewDBStatsCollector(db.db, "main")
264}
265
266func (db *SqliteDB) Stats(ctx context.Context) (*DatabaseStats, error) {
267 db.lock.RLock()
268 defer db.lock.RUnlock()
269
270 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
271 defer cancel()
272
273 var stats DatabaseStats
274 row := db.db.QueryRowContext(ctx, `SELECT
275 (SELECT COUNT(*) FROM User) AS users,
276 (SELECT COUNT(*) FROM Network) AS networks,
277 (SELECT COUNT(*) FROM Channel) AS channels`)
278 if err := row.Scan(&stats.Users, &stats.Networks, &stats.Channels); err != nil {
279 return nil, err
280 }
281
282 return &stats, nil
283}
284
285func toNullString(s string) sql.NullString {
286 return sql.NullString{
287 String: s,
288 Valid: s != "",
289 }
290}
291
292func (db *SqliteDB) ListUsers(ctx context.Context) ([]User, error) {
293 db.lock.RLock()
294 defer db.lock.RUnlock()
295
296 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
297 defer cancel()
298
299 rows, err := db.db.QueryContext(ctx,
300 "SELECT id, username, password, admin, realname FROM User")
301 if err != nil {
302 return nil, err
303 }
304 defer rows.Close()
305
306 var users []User
307 for rows.Next() {
308 var user User
309 var password, realname sql.NullString
310 if err := rows.Scan(&user.ID, &user.Username, &password, &user.Admin, &realname); err != nil {
311 return nil, err
312 }
313 user.Password = password.String
314 user.Realname = realname.String
315 users = append(users, user)
316 }
317 if err := rows.Err(); err != nil {
318 return nil, err
319 }
320
321 return users, nil
322}
323
324func (db *SqliteDB) GetUser(ctx context.Context, username string) (*User, error) {
325 db.lock.RLock()
326 defer db.lock.RUnlock()
327
328 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
329 defer cancel()
330
331 user := &User{Username: username}
332
333 var password, realname sql.NullString
334 row := db.db.QueryRowContext(ctx,
335 "SELECT id, password, admin, realname FROM User WHERE username = ?",
336 username)
337 if err := row.Scan(&user.ID, &password, &user.Admin, &realname); err != nil {
338 return nil, err
339 }
340 user.Password = password.String
341 user.Realname = realname.String
342 return user, nil
343}
344
345func (db *SqliteDB) StoreUser(ctx context.Context, user *User) error {
346 db.lock.Lock()
347 defer db.lock.Unlock()
348
349 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
350 defer cancel()
351
352 args := []interface{}{
353 sql.Named("username", user.Username),
354 sql.Named("password", toNullString(user.Password)),
355 sql.Named("admin", user.Admin),
356 sql.Named("realname", toNullString(user.Realname)),
357 }
358
359 var err error
360 if user.ID != 0 {
361 _, err = db.db.ExecContext(ctx, `
362 UPDATE User SET password = :password, admin = :admin,
363 realname = :realname WHERE username = :username`,
364 args...)
365 } else {
366 var res sql.Result
367 res, err = db.db.ExecContext(ctx, `
368 INSERT INTO
369 User(username, password, admin, realname)
370 VALUES (:username, :password, :admin, :realname)`,
371 args...)
372 if err != nil {
373 return err
374 }
375 user.ID, err = res.LastInsertId()
376 }
377
378 return err
379}
380
381func (db *SqliteDB) DeleteUser(ctx context.Context, id int64) error {
382 db.lock.Lock()
383 defer db.lock.Unlock()
384
385 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
386 defer cancel()
387
388 tx, err := db.db.Begin()
389 if err != nil {
390 return err
391 }
392 defer tx.Rollback()
393
394 _, err = tx.ExecContext(ctx, `DELETE FROM DeliveryReceipt
395 WHERE id IN (
396 SELECT DeliveryReceipt.id
397 FROM DeliveryReceipt
398 JOIN Network ON DeliveryReceipt.network = Network.id
399 WHERE Network.user = ?
400 )`, id)
401 if err != nil {
402 return err
403 }
404
405 _, err = tx.ExecContext(ctx, `DELETE FROM ReadReceipt
406 WHERE id IN (
407 SELECT ReadReceipt.id
408 FROM ReadReceipt
409 JOIN Network ON ReadReceipt.network = Network.id
410 WHERE Network.user = ?
411 )`, id)
412 if err != nil {
413 return err
414 }
415
416 _, err = tx.ExecContext(ctx, `DELETE FROM Channel
417 WHERE id IN (
418 SELECT Channel.id
419 FROM Channel
420 JOIN Network ON Channel.network = Network.id
421 WHERE Network.user = ?
422 )`, id)
423 if err != nil {
424 return err
425 }
426
427 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE user = ?", id)
428 if err != nil {
429 return err
430 }
431
432 _, err = tx.ExecContext(ctx, "DELETE FROM User WHERE id = ?", id)
433 if err != nil {
434 return err
435 }
436
437 return tx.Commit()
438}
439
440func (db *SqliteDB) ListNetworks(ctx context.Context, userID int64) ([]Network, error) {
441 db.lock.RLock()
442 defer db.lock.RUnlock()
443
444 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
445 defer cancel()
446
447 rows, err := db.db.QueryContext(ctx, `
448 SELECT id, name, addr, nick, username, realname, pass,
449 connect_commands, sasl_mechanism, sasl_plain_username, sasl_plain_password,
450 sasl_external_cert, sasl_external_key, enabled
451 FROM Network
452 WHERE user = ?`,
453 userID)
454 if err != nil {
455 return nil, err
456 }
457 defer rows.Close()
458
459 var networks []Network
460 for rows.Next() {
461 var net Network
462 var name, nick, username, realname, pass, connectCommands sql.NullString
463 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
464 err := rows.Scan(&net.ID, &name, &net.Addr, &nick, &username, &realname,
465 &pass, &connectCommands, &saslMechanism, &saslPlainUsername, &saslPlainPassword,
466 &net.SASL.External.CertBlob, &net.SASL.External.PrivKeyBlob, &net.Enabled)
467 if err != nil {
468 return nil, err
469 }
470 net.Name = name.String
471 net.Nick = nick.String
472 net.Username = username.String
473 net.Realname = realname.String
474 net.Pass = pass.String
475 if connectCommands.Valid {
476 net.ConnectCommands = strings.Split(connectCommands.String, "\r\n")
477 }
478 net.SASL.Mechanism = saslMechanism.String
479 net.SASL.Plain.Username = saslPlainUsername.String
480 net.SASL.Plain.Password = saslPlainPassword.String
481 networks = append(networks, net)
482 }
483 if err := rows.Err(); err != nil {
484 return nil, err
485 }
486
487 return networks, nil
488}
489
490func (db *SqliteDB) StoreNetwork(ctx context.Context, userID int64, network *Network) error {
491 db.lock.Lock()
492 defer db.lock.Unlock()
493
494 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
495 defer cancel()
496
497 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
498 if network.SASL.Mechanism != "" {
499 saslMechanism = toNullString(network.SASL.Mechanism)
500 switch network.SASL.Mechanism {
501 case "PLAIN":
502 saslPlainUsername = toNullString(network.SASL.Plain.Username)
503 saslPlainPassword = toNullString(network.SASL.Plain.Password)
504 network.SASL.External.CertBlob = nil
505 network.SASL.External.PrivKeyBlob = nil
506 case "EXTERNAL":
507 // keep saslPlain* nil
508 default:
509 return fmt.Errorf("suika: cannot store network: unsupported SASL mechanism %q", network.SASL.Mechanism)
510 }
511 }
512
513 args := []interface{}{
514 sql.Named("name", toNullString(network.Name)),
515 sql.Named("addr", network.Addr),
516 sql.Named("nick", toNullString(network.Nick)),
517 sql.Named("username", toNullString(network.Username)),
518 sql.Named("realname", toNullString(network.Realname)),
519 sql.Named("pass", toNullString(network.Pass)),
520 sql.Named("connect_commands", toNullString(strings.Join(network.ConnectCommands, "\r\n"))),
521 sql.Named("sasl_mechanism", saslMechanism),
522 sql.Named("sasl_plain_username", saslPlainUsername),
523 sql.Named("sasl_plain_password", saslPlainPassword),
524 sql.Named("sasl_external_cert", network.SASL.External.CertBlob),
525 sql.Named("sasl_external_key", network.SASL.External.PrivKeyBlob),
526 sql.Named("enabled", network.Enabled),
527
528 sql.Named("id", network.ID), // only for UPDATE
529 sql.Named("user", userID), // only for INSERT
530 }
531
532 var err error
533 if network.ID != 0 {
534 _, err = db.db.ExecContext(ctx, `
535 UPDATE Network
536 SET name = :name, addr = :addr, nick = :nick, username = :username,
537 realname = :realname, pass = :pass, connect_commands = :connect_commands,
538 sasl_mechanism = :sasl_mechanism, sasl_plain_username = :sasl_plain_username, sasl_plain_password = :sasl_plain_password,
539 sasl_external_cert = :sasl_external_cert, sasl_external_key = :sasl_external_key,
540 enabled = :enabled
541 WHERE id = :id`, args...)
542 } else {
543 var res sql.Result
544 res, err = db.db.ExecContext(ctx, `
545 INSERT INTO Network(user, name, addr, nick, username, realname, pass,
546 connect_commands, sasl_mechanism, sasl_plain_username,
547 sasl_plain_password, sasl_external_cert, sasl_external_key, enabled)
548 VALUES (:user, :name, :addr, :nick, :username, :realname, :pass,
549 :connect_commands, :sasl_mechanism, :sasl_plain_username,
550 :sasl_plain_password, :sasl_external_cert, :sasl_external_key, :enabled)`,
551 args...)
552 if err != nil {
553 return err
554 }
555 network.ID, err = res.LastInsertId()
556 }
557 return err
558}
559
560func (db *SqliteDB) DeleteNetwork(ctx context.Context, id int64) error {
561 db.lock.Lock()
562 defer db.lock.Unlock()
563
564 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
565 defer cancel()
566
567 tx, err := db.db.Begin()
568 if err != nil {
569 return err
570 }
571 defer tx.Rollback()
572
573 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ?", id)
574 if err != nil {
575 return err
576 }
577
578 _, err = tx.ExecContext(ctx, "DELETE FROM ReadReceipt WHERE network = ?", id)
579 if err != nil {
580 return err
581 }
582
583 _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id)
584 if err != nil {
585 return err
586 }
587
588 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE id = ?", id)
589 if err != nil {
590 return err
591 }
592
593 return tx.Commit()
594}
595
596func (db *SqliteDB) ListChannels(ctx context.Context, networkID int64) ([]Channel, error) {
597 db.lock.RLock()
598 defer db.lock.RUnlock()
599
600 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
601 defer cancel()
602
603 rows, err := db.db.QueryContext(ctx, `SELECT
604 id, name, key, detached, detached_internal_msgid,
605 relay_detached, reattach_on, detach_after, detach_on
606 FROM Channel
607 WHERE network = ?`, networkID)
608 if err != nil {
609 return nil, err
610 }
611 defer rows.Close()
612
613 var channels []Channel
614 for rows.Next() {
615 var ch Channel
616 var key, detachedInternalMsgID sql.NullString
617 var detachAfter int64
618 if err := rows.Scan(&ch.ID, &ch.Name, &key, &ch.Detached, &detachedInternalMsgID, &ch.RelayDetached, &ch.ReattachOn, &detachAfter, &ch.DetachOn); err != nil {
619 return nil, err
620 }
621 ch.Key = key.String
622 ch.DetachedInternalMsgID = detachedInternalMsgID.String
623 ch.DetachAfter = time.Duration(detachAfter) * time.Second
624 channels = append(channels, ch)
625 }
626 if err := rows.Err(); err != nil {
627 return nil, err
628 }
629
630 return channels, nil
631}
632
633func (db *SqliteDB) StoreChannel(ctx context.Context, networkID int64, ch *Channel) error {
634 db.lock.Lock()
635 defer db.lock.Unlock()
636
637 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
638 defer cancel()
639
640 args := []interface{}{
641 sql.Named("network", networkID),
642 sql.Named("name", ch.Name),
643 sql.Named("key", toNullString(ch.Key)),
644 sql.Named("detached", ch.Detached),
645 sql.Named("detached_internal_msgid", toNullString(ch.DetachedInternalMsgID)),
646 sql.Named("relay_detached", ch.RelayDetached),
647 sql.Named("reattach_on", ch.ReattachOn),
648 sql.Named("detach_after", int64(math.Ceil(ch.DetachAfter.Seconds()))),
649 sql.Named("detach_on", ch.DetachOn),
650
651 sql.Named("id", ch.ID), // only for UPDATE
652 }
653
654 var err error
655 if ch.ID != 0 {
656 _, err = db.db.ExecContext(ctx, `UPDATE Channel
657 SET network = :network, name = :name, key = :key, detached = :detached,
658 detached_internal_msgid = :detached_internal_msgid, relay_detached = :relay_detached,
659 reattach_on = :reattach_on, detach_after = :detach_after, detach_on = :detach_on
660 WHERE id = :id`, args...)
661 } else {
662 var res sql.Result
663 res, err = db.db.ExecContext(ctx, `INSERT INTO Channel(network, name, key, detached, detached_internal_msgid, relay_detached, reattach_on, detach_after, detach_on)
664 VALUES (:network, :name, :key, :detached, :detached_internal_msgid, :relay_detached, :reattach_on, :detach_after, :detach_on)`, args...)
665 if err != nil {
666 return err
667 }
668 ch.ID, err = res.LastInsertId()
669 }
670 return err
671}
672
673func (db *SqliteDB) DeleteChannel(ctx context.Context, id int64) error {
674 db.lock.Lock()
675 defer db.lock.Unlock()
676
677 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
678 defer cancel()
679
680 _, err := db.db.ExecContext(ctx, "DELETE FROM Channel WHERE id = ?", id)
681 return err
682}
683
684func (db *SqliteDB) ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) {
685 db.lock.RLock()
686 defer db.lock.RUnlock()
687
688 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
689 defer cancel()
690
691 rows, err := db.db.QueryContext(ctx, `
692 SELECT id, target, client, internal_msgid
693 FROM DeliveryReceipt
694 WHERE network = ?`, networkID)
695 if err != nil {
696 return nil, err
697 }
698 defer rows.Close()
699
700 var receipts []DeliveryReceipt
701 for rows.Next() {
702 var rcpt DeliveryReceipt
703 var client sql.NullString
704 if err := rows.Scan(&rcpt.ID, &rcpt.Target, &client, &rcpt.InternalMsgID); err != nil {
705 return nil, err
706 }
707 rcpt.Client = client.String
708 receipts = append(receipts, rcpt)
709 }
710 if err := rows.Err(); err != nil {
711 return nil, err
712 }
713
714 return receipts, nil
715}
716
717func (db *SqliteDB) StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error {
718 db.lock.Lock()
719 defer db.lock.Unlock()
720
721 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
722 defer cancel()
723
724 tx, err := db.db.Begin()
725 if err != nil {
726 return err
727 }
728 defer tx.Rollback()
729
730 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ? AND client IS ?",
731 networkID, toNullString(client))
732 if err != nil {
733 return err
734 }
735
736 for i := range receipts {
737 rcpt := &receipts[i]
738
739 res, err := tx.ExecContext(ctx, `
740 INSERT INTO DeliveryReceipt(network, target, client, internal_msgid)
741 VALUES (:network, :target, :client, :internal_msgid)`,
742 sql.Named("network", networkID),
743 sql.Named("target", rcpt.Target),
744 sql.Named("client", toNullString(client)),
745 sql.Named("internal_msgid", rcpt.InternalMsgID))
746 if err != nil {
747 return err
748 }
749 rcpt.ID, err = res.LastInsertId()
750 if err != nil {
751 return err
752 }
753 }
754
755 return tx.Commit()
756}
757
758func (db *SqliteDB) GetReadReceipt(ctx context.Context, networkID int64, name string) (*ReadReceipt, error) {
759 db.lock.RLock()
760 defer db.lock.RUnlock()
761
762 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
763 defer cancel()
764
765 receipt := &ReadReceipt{
766 Target: name,
767 }
768
769 row := db.db.QueryRowContext(ctx, `
770 SELECT id, timestamp FROM ReadReceipt WHERE network = :network AND target = :target`,
771 sql.Named("network", networkID),
772 sql.Named("target", name),
773 )
774 var timestamp string
775 if err := row.Scan(&receipt.ID, &timestamp); err != nil {
776 if err == sql.ErrNoRows {
777 return nil, nil
778 }
779 return nil, err
780 }
781 if t, err := time.Parse(serverTimeLayout, timestamp); err != nil {
782 return nil, err
783 } else {
784 receipt.Timestamp = t
785 }
786 return receipt, nil
787}
788
789func (db *SqliteDB) StoreReadReceipt(ctx context.Context, networkID int64, receipt *ReadReceipt) error {
790 db.lock.Lock()
791 defer db.lock.Unlock()
792
793 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
794 defer cancel()
795
796 args := []interface{}{
797 sql.Named("id", receipt.ID),
798 sql.Named("timestamp", formatServerTime(receipt.Timestamp)),
799 sql.Named("network", networkID),
800 sql.Named("target", receipt.Target),
801 }
802
803 var err error
804 if receipt.ID != 0 {
805 _, err = db.db.ExecContext(ctx, `
806 UPDATE ReadReceipt SET timestamp = :timestamp WHERE id = :id`,
807 args...)
808 } else {
809 var res sql.Result
810 res, err = db.db.ExecContext(ctx, `
811 INSERT INTO
812 ReadReceipt(network, target, timestamp)
813 VALUES (:network, :target, :timestamp)`,
814 args...)
815 if err != nil {
816 return err
817 }
818 receipt.ID, err = res.LastInsertId()
819 }
820
821 return err
822}
Note: See TracBrowser for help on using the repository browser.