source: code/trunk/db_sqlite.go@ 772

Last change on this file since 772 was 712, checked in by contact, 4 years ago

Add Prometheus instrumentation for the database

File size: 19.4 KB
RevLine 
[531]1package soju
2
3import (
[645]4 "context"
[531]5 "database/sql"
6 "fmt"
7 "math"
8 "strings"
9 "sync"
10 "time"
11
12 _ "github.com/mattn/go-sqlite3"
[712]13 "github.com/prometheus/client_golang/prometheus"
14 promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
[531]15)
16
[645]17const sqliteQueryTimeout = 5 * time.Second
18
[531]19const sqliteSchema = `
20CREATE TABLE User (
21 id INTEGER PRIMARY KEY,
[663]22 username TEXT NOT NULL UNIQUE,
23 password TEXT,
[568]24 admin INTEGER NOT NULL DEFAULT 0,
[663]25 realname TEXT
[531]26);
27
28CREATE TABLE Network (
29 id INTEGER PRIMARY KEY,
[663]30 name TEXT,
[531]31 user INTEGER NOT NULL,
[663]32 addr TEXT NOT NULL,
[664]33 nick TEXT,
[663]34 username TEXT,
35 realname TEXT,
36 pass TEXT,
37 connect_commands TEXT,
38 sasl_mechanism TEXT,
39 sasl_plain_username TEXT,
40 sasl_plain_password TEXT,
41 sasl_external_cert BLOB,
42 sasl_external_key BLOB,
[542]43 enabled INTEGER NOT NULL DEFAULT 1,
[531]44 FOREIGN KEY(user) REFERENCES User(id),
45 UNIQUE(user, addr, nick),
46 UNIQUE(user, name)
47);
48
49CREATE TABLE Channel (
50 id INTEGER PRIMARY KEY,
51 network INTEGER NOT NULL,
[663]52 name TEXT NOT NULL,
53 key TEXT,
[531]54 detached INTEGER NOT NULL DEFAULT 0,
[663]55 detached_internal_msgid TEXT,
[531]56 relay_detached INTEGER NOT NULL DEFAULT 0,
57 reattach_on INTEGER NOT NULL DEFAULT 0,
58 detach_after INTEGER NOT NULL DEFAULT 0,
59 detach_on INTEGER NOT NULL DEFAULT 0,
60 FOREIGN KEY(network) REFERENCES Network(id),
61 UNIQUE(network, name)
62);
63
64CREATE TABLE DeliveryReceipt (
65 id INTEGER PRIMARY KEY,
66 network INTEGER NOT NULL,
[663]67 target TEXT NOT NULL,
68 client TEXT,
69 internal_msgid TEXT NOT NULL,
[531]70 FOREIGN KEY(network) REFERENCES Network(id),
71 UNIQUE(network, target, client)
72);
73`
74
75var sqliteMigrations = []string{
76 "", // migration #0 is reserved for schema initialization
77 "ALTER TABLE Network ADD COLUMN connect_commands VARCHAR(1023)",
78 "ALTER TABLE Channel ADD COLUMN detached INTEGER NOT NULL DEFAULT 0",
79 "ALTER TABLE Network ADD COLUMN sasl_external_cert BLOB DEFAULT NULL",
80 "ALTER TABLE Network ADD COLUMN sasl_external_key BLOB DEFAULT NULL",
81 "ALTER TABLE User ADD COLUMN admin INTEGER NOT NULL DEFAULT 0",
82 `
83 CREATE TABLE UserNew (
84 id INTEGER PRIMARY KEY,
85 username VARCHAR(255) NOT NULL UNIQUE,
86 password VARCHAR(255),
87 admin INTEGER NOT NULL DEFAULT 0
88 );
89 INSERT INTO UserNew SELECT rowid, username, password, admin FROM User;
90 DROP TABLE User;
91 ALTER TABLE UserNew RENAME TO User;
92 `,
93 `
94 CREATE TABLE NetworkNew (
95 id INTEGER PRIMARY KEY,
96 name VARCHAR(255),
97 user INTEGER NOT NULL,
98 addr VARCHAR(255) NOT NULL,
99 nick VARCHAR(255) NOT NULL,
100 username VARCHAR(255),
101 realname VARCHAR(255),
102 pass VARCHAR(255),
103 connect_commands VARCHAR(1023),
104 sasl_mechanism VARCHAR(255),
105 sasl_plain_username VARCHAR(255),
106 sasl_plain_password VARCHAR(255),
107 sasl_external_cert BLOB DEFAULT NULL,
108 sasl_external_key BLOB DEFAULT NULL,
109 FOREIGN KEY(user) REFERENCES User(id),
110 UNIQUE(user, addr, nick),
111 UNIQUE(user, name)
112 );
113 INSERT INTO NetworkNew
114 SELECT Network.id, name, User.id as user, addr, nick,
115 Network.username, realname, pass, connect_commands,
116 sasl_mechanism, sasl_plain_username, sasl_plain_password,
117 sasl_external_cert, sasl_external_key
118 FROM Network
119 JOIN User ON Network.user = User.username;
120 DROP TABLE Network;
121 ALTER TABLE NetworkNew RENAME TO Network;
122 `,
123 `
124 ALTER TABLE Channel ADD COLUMN relay_detached INTEGER NOT NULL DEFAULT 0;
125 ALTER TABLE Channel ADD COLUMN reattach_on INTEGER NOT NULL DEFAULT 0;
126 ALTER TABLE Channel ADD COLUMN detach_after INTEGER NOT NULL DEFAULT 0;
127 ALTER TABLE Channel ADD COLUMN detach_on INTEGER NOT NULL DEFAULT 0;
128 `,
129 `
130 CREATE TABLE DeliveryReceipt (
131 id INTEGER PRIMARY KEY,
132 network INTEGER NOT NULL,
133 target VARCHAR(255) NOT NULL,
134 client VARCHAR(255),
135 internal_msgid VARCHAR(255) NOT NULL,
136 FOREIGN KEY(network) REFERENCES Network(id),
137 UNIQUE(network, target, client)
138 );
139 `,
140 "ALTER TABLE Channel ADD COLUMN detached_internal_msgid VARCHAR(255)",
[542]141 "ALTER TABLE Network ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1",
[568]142 "ALTER TABLE User ADD COLUMN realname VARCHAR(255)",
[664]143 `
144 CREATE TABLE NetworkNew (
145 id INTEGER PRIMARY KEY,
146 name TEXT,
147 user INTEGER NOT NULL,
148 addr TEXT NOT NULL,
149 nick TEXT,
150 username TEXT,
151 realname TEXT,
152 pass TEXT,
153 connect_commands TEXT,
154 sasl_mechanism TEXT,
155 sasl_plain_username TEXT,
156 sasl_plain_password TEXT,
157 sasl_external_cert BLOB,
158 sasl_external_key BLOB,
159 enabled INTEGER NOT NULL DEFAULT 1,
160 FOREIGN KEY(user) REFERENCES User(id),
161 UNIQUE(user, addr, nick),
162 UNIQUE(user, name)
163 );
164 INSERT INTO NetworkNew
165 SELECT id, name, user, addr, nick, username, realname, pass,
166 connect_commands, sasl_mechanism, sasl_plain_username,
167 sasl_plain_password, sasl_external_cert, sasl_external_key,
168 enabled
169 FROM Network;
170 DROP TABLE Network;
171 ALTER TABLE NetworkNew RENAME TO Network;
172 `,
[531]173}
174
175type SqliteDB struct {
176 lock sync.RWMutex
177 db *sql.DB
178}
179
[620]180func OpenSqliteDB(source string) (Database, error) {
181 sqlSqliteDB, err := sql.Open("sqlite3", source)
[531]182 if err != nil {
183 return nil, err
184 }
185
186 db := &SqliteDB{db: sqlSqliteDB}
187 if err := db.upgrade(); err != nil {
[588]188 sqlSqliteDB.Close()
[531]189 return nil, err
190 }
191
192 return db, nil
193}
194
195func (db *SqliteDB) Close() error {
196 db.lock.Lock()
197 defer db.lock.Unlock()
198 return db.db.Close()
199}
200
201func (db *SqliteDB) upgrade() error {
202 db.lock.Lock()
203 defer db.lock.Unlock()
204
205 var version int
206 if err := db.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
207 return fmt.Errorf("failed to query schema version: %v", err)
208 }
209
210 if version == len(sqliteMigrations) {
211 return nil
212 } else if version > len(sqliteMigrations) {
213 return fmt.Errorf("soju (version %d) older than schema (version %d)", len(sqliteMigrations), version)
214 }
215
216 tx, err := db.db.Begin()
217 if err != nil {
218 return err
219 }
220 defer tx.Rollback()
221
222 if version == 0 {
223 if _, err := tx.Exec(sqliteSchema); err != nil {
224 return fmt.Errorf("failed to initialize schema: %v", err)
225 }
226 } else {
227 for i := version; i < len(sqliteMigrations); i++ {
228 if _, err := tx.Exec(sqliteMigrations[i]); err != nil {
229 return fmt.Errorf("failed to execute migration #%v: %v", i, err)
230 }
231 }
232 }
233
234 // For some reason prepared statements don't work here
235 _, err = tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", len(sqliteMigrations)))
236 if err != nil {
237 return fmt.Errorf("failed to bump schema version: %v", err)
238 }
239
240 return tx.Commit()
241}
242
[712]243func (db *SqliteDB) MetricsCollector() prometheus.Collector {
244 return promcollectors.NewDBStatsCollector(db.db, "main")
245}
246
[652]247func (db *SqliteDB) Stats(ctx context.Context) (*DatabaseStats, error) {
[607]248 db.lock.RLock()
249 defer db.lock.RUnlock()
250
[652]251 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]252 defer cancel()
253
[607]254 var stats DatabaseStats
[645]255 row := db.db.QueryRowContext(ctx, `SELECT
[607]256 (SELECT COUNT(*) FROM User) AS users,
257 (SELECT COUNT(*) FROM Network) AS networks,
258 (SELECT COUNT(*) FROM Channel) AS channels`)
259 if err := row.Scan(&stats.Users, &stats.Networks, &stats.Channels); err != nil {
260 return nil, err
261 }
262
263 return &stats, nil
264}
265
[531]266func toNullString(s string) sql.NullString {
267 return sql.NullString{
268 String: s,
269 Valid: s != "",
270 }
271}
272
[652]273func (db *SqliteDB) ListUsers(ctx context.Context) ([]User, error) {
[531]274 db.lock.RLock()
275 defer db.lock.RUnlock()
276
[652]277 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]278 defer cancel()
279
280 rows, err := db.db.QueryContext(ctx,
281 "SELECT id, username, password, admin, realname FROM User")
[531]282 if err != nil {
283 return nil, err
284 }
285 defer rows.Close()
286
287 var users []User
288 for rows.Next() {
289 var user User
[598]290 var password, realname sql.NullString
291 if err := rows.Scan(&user.ID, &user.Username, &password, &user.Admin, &realname); err != nil {
[531]292 return nil, err
293 }
294 user.Password = password.String
[598]295 user.Realname = realname.String
[531]296 users = append(users, user)
297 }
298 if err := rows.Err(); err != nil {
299 return nil, err
300 }
301
302 return users, nil
303}
304
[652]305func (db *SqliteDB) GetUser(ctx context.Context, username string) (*User, error) {
[531]306 db.lock.RLock()
307 defer db.lock.RUnlock()
308
[652]309 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]310 defer cancel()
311
[531]312 user := &User{Username: username}
313
[568]314 var password, realname sql.NullString
[645]315 row := db.db.QueryRowContext(ctx,
316 "SELECT id, password, admin, realname FROM User WHERE username = ?",
317 username)
[568]318 if err := row.Scan(&user.ID, &password, &user.Admin, &realname); err != nil {
[531]319 return nil, err
320 }
321 user.Password = password.String
[568]322 user.Realname = realname.String
[531]323 return user, nil
324}
325
[652]326func (db *SqliteDB) StoreUser(ctx context.Context, user *User) error {
[531]327 db.lock.Lock()
328 defer db.lock.Unlock()
329
[652]330 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]331 defer cancel()
332
[596]333 args := []interface{}{
334 sql.Named("username", user.Username),
335 sql.Named("password", toNullString(user.Password)),
336 sql.Named("admin", user.Admin),
337 sql.Named("realname", toNullString(user.Realname)),
338 }
[531]339
340 var err error
341 if user.ID != 0 {
[645]342 _, err = db.db.ExecContext(ctx, `
343 UPDATE User SET password = :password, admin = :admin,
344 realname = :realname WHERE username = :username`,
345 args...)
[531]346 } else {
347 var res sql.Result
[645]348 res, err = db.db.ExecContext(ctx, `
349 INSERT INTO
350 User(username, password, admin, realname)
351 VALUES (:username, :password, :admin, :realname)`,
352 args...)
[531]353 if err != nil {
354 return err
355 }
356 user.ID, err = res.LastInsertId()
357 }
358
359 return err
360}
361
[652]362func (db *SqliteDB) DeleteUser(ctx context.Context, id int64) error {
[531]363 db.lock.Lock()
364 defer db.lock.Unlock()
365
[652]366 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]367 defer cancel()
368
[531]369 tx, err := db.db.Begin()
370 if err != nil {
371 return err
372 }
373 defer tx.Rollback()
374
[645]375 _, err = tx.ExecContext(ctx, `DELETE FROM DeliveryReceipt
[595]376 WHERE id IN (
377 SELECT DeliveryReceipt.id
378 FROM DeliveryReceipt
379 JOIN Network ON DeliveryReceipt.network = Network.id
380 WHERE Network.user = ?
381 )`, id)
382 if err != nil {
383 return err
384 }
385
[645]386 _, err = tx.ExecContext(ctx, `DELETE FROM Channel
[531]387 WHERE id IN (
388 SELECT Channel.id
389 FROM Channel
390 JOIN Network ON Channel.network = Network.id
391 WHERE Network.user = ?
392 )`, id)
393 if err != nil {
394 return err
395 }
396
[645]397 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE user = ?", id)
[531]398 if err != nil {
399 return err
400 }
401
[645]402 _, err = tx.ExecContext(ctx, "DELETE FROM User WHERE id = ?", id)
[531]403 if err != nil {
404 return err
405 }
406
407 return tx.Commit()
408}
409
[652]410func (db *SqliteDB) ListNetworks(ctx context.Context, userID int64) ([]Network, error) {
[531]411 db.lock.RLock()
412 defer db.lock.RUnlock()
413
[652]414 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]415 defer cancel()
416
417 rows, err := db.db.QueryContext(ctx, `
418 SELECT id, name, addr, nick, username, realname, pass,
[531]419 connect_commands, sasl_mechanism, sasl_plain_username, sasl_plain_password,
[542]420 sasl_external_cert, sasl_external_key, enabled
[531]421 FROM Network
422 WHERE user = ?`,
423 userID)
424 if err != nil {
425 return nil, err
426 }
427 defer rows.Close()
428
429 var networks []Network
430 for rows.Next() {
431 var net Network
[664]432 var name, nick, username, realname, pass, connectCommands sql.NullString
[531]433 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
[664]434 err := rows.Scan(&net.ID, &name, &net.Addr, &nick, &username, &realname,
[531]435 &pass, &connectCommands, &saslMechanism, &saslPlainUsername, &saslPlainPassword,
[542]436 &net.SASL.External.CertBlob, &net.SASL.External.PrivKeyBlob, &net.Enabled)
[531]437 if err != nil {
438 return nil, err
439 }
440 net.Name = name.String
[664]441 net.Nick = nick.String
[531]442 net.Username = username.String
443 net.Realname = realname.String
444 net.Pass = pass.String
445 if connectCommands.Valid {
446 net.ConnectCommands = strings.Split(connectCommands.String, "\r\n")
447 }
448 net.SASL.Mechanism = saslMechanism.String
449 net.SASL.Plain.Username = saslPlainUsername.String
450 net.SASL.Plain.Password = saslPlainPassword.String
451 networks = append(networks, net)
452 }
453 if err := rows.Err(); err != nil {
454 return nil, err
455 }
456
457 return networks, nil
458}
459
[652]460func (db *SqliteDB) StoreNetwork(ctx context.Context, userID int64, network *Network) error {
[531]461 db.lock.Lock()
462 defer db.lock.Unlock()
463
[652]464 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]465 defer cancel()
466
[531]467 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
468 if network.SASL.Mechanism != "" {
469 saslMechanism = toNullString(network.SASL.Mechanism)
470 switch network.SASL.Mechanism {
471 case "PLAIN":
472 saslPlainUsername = toNullString(network.SASL.Plain.Username)
473 saslPlainPassword = toNullString(network.SASL.Plain.Password)
474 network.SASL.External.CertBlob = nil
475 network.SASL.External.PrivKeyBlob = nil
476 case "EXTERNAL":
477 // keep saslPlain* nil
478 default:
479 return fmt.Errorf("soju: cannot store network: unsupported SASL mechanism %q", network.SASL.Mechanism)
480 }
481 }
482
[596]483 args := []interface{}{
484 sql.Named("name", toNullString(network.Name)),
485 sql.Named("addr", network.Addr),
[664]486 sql.Named("nick", toNullString(network.Nick)),
[596]487 sql.Named("username", toNullString(network.Username)),
488 sql.Named("realname", toNullString(network.Realname)),
489 sql.Named("pass", toNullString(network.Pass)),
490 sql.Named("connect_commands", toNullString(strings.Join(network.ConnectCommands, "\r\n"))),
491 sql.Named("sasl_mechanism", saslMechanism),
492 sql.Named("sasl_plain_username", saslPlainUsername),
493 sql.Named("sasl_plain_password", saslPlainPassword),
494 sql.Named("sasl_external_cert", network.SASL.External.CertBlob),
495 sql.Named("sasl_external_key", network.SASL.External.PrivKeyBlob),
496 sql.Named("enabled", network.Enabled),
497
498 sql.Named("id", network.ID), // only for UPDATE
499 sql.Named("user", userID), // only for INSERT
500 }
501
[531]502 var err error
503 if network.ID != 0 {
[645]504 _, err = db.db.ExecContext(ctx, `
[596]505 UPDATE Network
506 SET name = :name, addr = :addr, nick = :nick, username = :username,
507 realname = :realname, pass = :pass, connect_commands = :connect_commands,
508 sasl_mechanism = :sasl_mechanism, sasl_plain_username = :sasl_plain_username, sasl_plain_password = :sasl_plain_password,
509 sasl_external_cert = :sasl_external_cert, sasl_external_key = :sasl_external_key,
510 enabled = :enabled
511 WHERE id = :id`, args...)
[531]512 } else {
513 var res sql.Result
[645]514 res, err = db.db.ExecContext(ctx, `
[596]515 INSERT INTO Network(user, name, addr, nick, username, realname, pass,
516 connect_commands, sasl_mechanism, sasl_plain_username,
[542]517 sasl_plain_password, sasl_external_cert, sasl_external_key, enabled)
[596]518 VALUES (:user, :name, :addr, :nick, :username, :realname, :pass,
519 :connect_commands, :sasl_mechanism, :sasl_plain_username,
520 :sasl_plain_password, :sasl_external_cert, :sasl_external_key, :enabled)`,
521 args...)
[531]522 if err != nil {
523 return err
524 }
525 network.ID, err = res.LastInsertId()
526 }
527 return err
528}
529
[652]530func (db *SqliteDB) DeleteNetwork(ctx context.Context, id int64) error {
[531]531 db.lock.Lock()
532 defer db.lock.Unlock()
533
[652]534 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]535 defer cancel()
536
[531]537 tx, err := db.db.Begin()
538 if err != nil {
539 return err
540 }
541 defer tx.Rollback()
542
[645]543 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ?", id)
[595]544 if err != nil {
545 return err
546 }
547
[645]548 _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id)
[531]549 if err != nil {
550 return err
551 }
552
[645]553 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE id = ?", id)
[531]554 if err != nil {
555 return err
556 }
557
558 return tx.Commit()
559}
560
[652]561func (db *SqliteDB) ListChannels(ctx context.Context, networkID int64) ([]Channel, error) {
[531]562 db.lock.RLock()
563 defer db.lock.RUnlock()
564
[652]565 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]566 defer cancel()
567
568 rows, err := db.db.QueryContext(ctx, `SELECT
[531]569 id, name, key, detached, detached_internal_msgid,
570 relay_detached, reattach_on, detach_after, detach_on
571 FROM Channel
572 WHERE network = ?`, networkID)
573 if err != nil {
574 return nil, err
575 }
576 defer rows.Close()
577
578 var channels []Channel
579 for rows.Next() {
580 var ch Channel
581 var key, detachedInternalMsgID sql.NullString
582 var detachAfter int64
583 if err := rows.Scan(&ch.ID, &ch.Name, &key, &ch.Detached, &detachedInternalMsgID, &ch.RelayDetached, &ch.ReattachOn, &detachAfter, &ch.DetachOn); err != nil {
584 return nil, err
585 }
586 ch.Key = key.String
587 ch.DetachedInternalMsgID = detachedInternalMsgID.String
588 ch.DetachAfter = time.Duration(detachAfter) * time.Second
589 channels = append(channels, ch)
590 }
591 if err := rows.Err(); err != nil {
592 return nil, err
593 }
594
595 return channels, nil
596}
597
[652]598func (db *SqliteDB) StoreChannel(ctx context.Context, networkID int64, ch *Channel) error {
[531]599 db.lock.Lock()
600 defer db.lock.Unlock()
601
[652]602 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]603 defer cancel()
604
[596]605 args := []interface{}{
606 sql.Named("network", networkID),
607 sql.Named("name", ch.Name),
608 sql.Named("key", toNullString(ch.Key)),
609 sql.Named("detached", ch.Detached),
610 sql.Named("detached_internal_msgid", toNullString(ch.DetachedInternalMsgID)),
611 sql.Named("relay_detached", ch.RelayDetached),
612 sql.Named("reattach_on", ch.ReattachOn),
613 sql.Named("detach_after", int64(math.Ceil(ch.DetachAfter.Seconds()))),
614 sql.Named("detach_on", ch.DetachOn),
[531]615
[596]616 sql.Named("id", ch.ID), // only for UPDATE
617 }
618
[531]619 var err error
620 if ch.ID != 0 {
[645]621 _, err = db.db.ExecContext(ctx, `UPDATE Channel
[596]622 SET network = :network, name = :name, key = :key, detached = :detached,
623 detached_internal_msgid = :detached_internal_msgid, relay_detached = :relay_detached,
624 reattach_on = :reattach_on, detach_after = :detach_after, detach_on = :detach_on
625 WHERE id = :id`, args...)
[531]626 } else {
627 var res sql.Result
[645]628 res, err = db.db.ExecContext(ctx, `INSERT INTO Channel(network, name, key, detached, detached_internal_msgid, relay_detached, reattach_on, detach_after, detach_on)
[596]629 VALUES (:network, :name, :key, :detached, :detached_internal_msgid, :relay_detached, :reattach_on, :detach_after, :detach_on)`, args...)
[531]630 if err != nil {
631 return err
632 }
633 ch.ID, err = res.LastInsertId()
634 }
635 return err
636}
637
[652]638func (db *SqliteDB) DeleteChannel(ctx context.Context, id int64) error {
[531]639 db.lock.Lock()
640 defer db.lock.Unlock()
641
[652]642 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]643 defer cancel()
644
645 _, err := db.db.ExecContext(ctx, "DELETE FROM Channel WHERE id = ?", id)
[531]646 return err
647}
648
[652]649func (db *SqliteDB) ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) {
[531]650 db.lock.RLock()
651 defer db.lock.RUnlock()
652
[652]653 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]654 defer cancel()
655
656 rows, err := db.db.QueryContext(ctx, `
657 SELECT id, target, client, internal_msgid
[531]658 FROM DeliveryReceipt
659 WHERE network = ?`, networkID)
660 if err != nil {
661 return nil, err
662 }
663 defer rows.Close()
664
665 var receipts []DeliveryReceipt
666 for rows.Next() {
667 var rcpt DeliveryReceipt
668 var client sql.NullString
669 if err := rows.Scan(&rcpt.ID, &rcpt.Target, &client, &rcpt.InternalMsgID); err != nil {
670 return nil, err
671 }
672 rcpt.Client = client.String
673 receipts = append(receipts, rcpt)
674 }
675 if err := rows.Err(); err != nil {
676 return nil, err
677 }
678
679 return receipts, nil
680}
681
[652]682func (db *SqliteDB) StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error {
[531]683 db.lock.Lock()
684 defer db.lock.Unlock()
685
[652]686 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]687 defer cancel()
688
[531]689 tx, err := db.db.Begin()
690 if err != nil {
691 return err
692 }
693 defer tx.Rollback()
694
[645]695 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ? AND client IS ?",
[531]696 networkID, toNullString(client))
697 if err != nil {
698 return err
699 }
700
701 for i := range receipts {
702 rcpt := &receipts[i]
703
[645]704 res, err := tx.ExecContext(ctx, `
705 INSERT INTO DeliveryReceipt(network, target, client, internal_msgid)
[596]706 VALUES (:network, :target, :client, :internal_msgid)`,
707 sql.Named("network", networkID),
708 sql.Named("target", rcpt.Target),
709 sql.Named("client", toNullString(client)),
710 sql.Named("internal_msgid", rcpt.InternalMsgID))
[531]711 if err != nil {
712 return err
713 }
714 rcpt.ID, err = res.LastInsertId()
715 if err != nil {
716 return err
717 }
718 }
719
720 return tx.Commit()
721}
Note: See TracBrowser for help on using the repository browser.