source: code/trunk/db_sqlite.go@ 661

Last change on this file since 661 was 652, checked in by contact, 4 years ago

Add context args to Database interface

This is a mecanical change, which just lifts up the context.TODO()
calls from inside the DB implementations to the callers.

Future work involves properly wiring up the contexts when it makes
sense.

File size: 18.5 KB
RevLine 
[531]1package soju
2
3import (
[645]4 "context"
[531]5 "database/sql"
6 "fmt"
7 "math"
8 "strings"
9 "sync"
10 "time"
11
12 _ "github.com/mattn/go-sqlite3"
13)
14
[645]15const sqliteQueryTimeout = 5 * time.Second
16
[531]17const sqliteSchema = `
18CREATE TABLE User (
19 id INTEGER PRIMARY KEY,
20 username VARCHAR(255) NOT NULL UNIQUE,
21 password VARCHAR(255),
[568]22 admin INTEGER NOT NULL DEFAULT 0,
23 realname VARCHAR(255)
[531]24);
25
26CREATE TABLE Network (
27 id INTEGER PRIMARY KEY,
28 name VARCHAR(255),
29 user INTEGER NOT NULL,
30 addr VARCHAR(255) NOT NULL,
31 nick VARCHAR(255) NOT NULL,
32 username VARCHAR(255),
33 realname VARCHAR(255),
34 pass VARCHAR(255),
35 connect_commands VARCHAR(1023),
36 sasl_mechanism VARCHAR(255),
37 sasl_plain_username VARCHAR(255),
38 sasl_plain_password VARCHAR(255),
39 sasl_external_cert BLOB DEFAULT NULL,
40 sasl_external_key BLOB DEFAULT NULL,
[542]41 enabled INTEGER NOT NULL DEFAULT 1,
[531]42 FOREIGN KEY(user) REFERENCES User(id),
43 UNIQUE(user, addr, nick),
44 UNIQUE(user, name)
45);
46
47CREATE TABLE Channel (
48 id INTEGER PRIMARY KEY,
49 network INTEGER NOT NULL,
50 name VARCHAR(255) NOT NULL,
51 key VARCHAR(255),
52 detached INTEGER NOT NULL DEFAULT 0,
53 detached_internal_msgid VARCHAR(255),
54 relay_detached INTEGER NOT NULL DEFAULT 0,
55 reattach_on INTEGER NOT NULL DEFAULT 0,
56 detach_after INTEGER NOT NULL DEFAULT 0,
57 detach_on INTEGER NOT NULL DEFAULT 0,
58 FOREIGN KEY(network) REFERENCES Network(id),
59 UNIQUE(network, name)
60);
61
62CREATE TABLE DeliveryReceipt (
63 id INTEGER PRIMARY KEY,
64 network INTEGER NOT NULL,
65 target VARCHAR(255) NOT NULL,
66 client VARCHAR(255),
67 internal_msgid VARCHAR(255) NOT NULL,
68 FOREIGN KEY(network) REFERENCES Network(id),
69 UNIQUE(network, target, client)
70);
71`
72
73var sqliteMigrations = []string{
74 "", // migration #0 is reserved for schema initialization
75 "ALTER TABLE Network ADD COLUMN connect_commands VARCHAR(1023)",
76 "ALTER TABLE Channel ADD COLUMN detached INTEGER NOT NULL DEFAULT 0",
77 "ALTER TABLE Network ADD COLUMN sasl_external_cert BLOB DEFAULT NULL",
78 "ALTER TABLE Network ADD COLUMN sasl_external_key BLOB DEFAULT NULL",
79 "ALTER TABLE User ADD COLUMN admin INTEGER NOT NULL DEFAULT 0",
80 `
81 CREATE TABLE UserNew (
82 id INTEGER PRIMARY KEY,
83 username VARCHAR(255) NOT NULL UNIQUE,
84 password VARCHAR(255),
85 admin INTEGER NOT NULL DEFAULT 0
86 );
87 INSERT INTO UserNew SELECT rowid, username, password, admin FROM User;
88 DROP TABLE User;
89 ALTER TABLE UserNew RENAME TO User;
90 `,
91 `
92 CREATE TABLE NetworkNew (
93 id INTEGER PRIMARY KEY,
94 name VARCHAR(255),
95 user INTEGER NOT NULL,
96 addr VARCHAR(255) NOT NULL,
97 nick VARCHAR(255) NOT NULL,
98 username VARCHAR(255),
99 realname VARCHAR(255),
100 pass VARCHAR(255),
101 connect_commands VARCHAR(1023),
102 sasl_mechanism VARCHAR(255),
103 sasl_plain_username VARCHAR(255),
104 sasl_plain_password VARCHAR(255),
105 sasl_external_cert BLOB DEFAULT NULL,
106 sasl_external_key BLOB DEFAULT NULL,
107 FOREIGN KEY(user) REFERENCES User(id),
108 UNIQUE(user, addr, nick),
109 UNIQUE(user, name)
110 );
111 INSERT INTO NetworkNew
112 SELECT Network.id, name, User.id as user, addr, nick,
113 Network.username, realname, pass, connect_commands,
114 sasl_mechanism, sasl_plain_username, sasl_plain_password,
115 sasl_external_cert, sasl_external_key
116 FROM Network
117 JOIN User ON Network.user = User.username;
118 DROP TABLE Network;
119 ALTER TABLE NetworkNew RENAME TO Network;
120 `,
121 `
122 ALTER TABLE Channel ADD COLUMN relay_detached INTEGER NOT NULL DEFAULT 0;
123 ALTER TABLE Channel ADD COLUMN reattach_on INTEGER NOT NULL DEFAULT 0;
124 ALTER TABLE Channel ADD COLUMN detach_after INTEGER NOT NULL DEFAULT 0;
125 ALTER TABLE Channel ADD COLUMN detach_on INTEGER NOT NULL DEFAULT 0;
126 `,
127 `
128 CREATE TABLE DeliveryReceipt (
129 id INTEGER PRIMARY KEY,
130 network INTEGER NOT NULL,
131 target VARCHAR(255) NOT NULL,
132 client VARCHAR(255),
133 internal_msgid VARCHAR(255) NOT NULL,
134 FOREIGN KEY(network) REFERENCES Network(id),
135 UNIQUE(network, target, client)
136 );
137 `,
138 "ALTER TABLE Channel ADD COLUMN detached_internal_msgid VARCHAR(255)",
[542]139 "ALTER TABLE Network ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1",
[568]140 "ALTER TABLE User ADD COLUMN realname VARCHAR(255)",
[531]141}
142
143type SqliteDB struct {
144 lock sync.RWMutex
145 db *sql.DB
146}
147
[620]148func OpenSqliteDB(source string) (Database, error) {
149 sqlSqliteDB, err := sql.Open("sqlite3", source)
[531]150 if err != nil {
151 return nil, err
152 }
153
154 db := &SqliteDB{db: sqlSqliteDB}
155 if err := db.upgrade(); err != nil {
[588]156 sqlSqliteDB.Close()
[531]157 return nil, err
158 }
159
160 return db, nil
161}
162
163func (db *SqliteDB) Close() error {
164 db.lock.Lock()
165 defer db.lock.Unlock()
166 return db.db.Close()
167}
168
169func (db *SqliteDB) upgrade() error {
170 db.lock.Lock()
171 defer db.lock.Unlock()
172
173 var version int
174 if err := db.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
175 return fmt.Errorf("failed to query schema version: %v", err)
176 }
177
178 if version == len(sqliteMigrations) {
179 return nil
180 } else if version > len(sqliteMigrations) {
181 return fmt.Errorf("soju (version %d) older than schema (version %d)", len(sqliteMigrations), version)
182 }
183
184 tx, err := db.db.Begin()
185 if err != nil {
186 return err
187 }
188 defer tx.Rollback()
189
190 if version == 0 {
191 if _, err := tx.Exec(sqliteSchema); err != nil {
192 return fmt.Errorf("failed to initialize schema: %v", err)
193 }
194 } else {
195 for i := version; i < len(sqliteMigrations); i++ {
196 if _, err := tx.Exec(sqliteMigrations[i]); err != nil {
197 return fmt.Errorf("failed to execute migration #%v: %v", i, err)
198 }
199 }
200 }
201
202 // For some reason prepared statements don't work here
203 _, err = tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", len(sqliteMigrations)))
204 if err != nil {
205 return fmt.Errorf("failed to bump schema version: %v", err)
206 }
207
208 return tx.Commit()
209}
210
[652]211func (db *SqliteDB) Stats(ctx context.Context) (*DatabaseStats, error) {
[607]212 db.lock.RLock()
213 defer db.lock.RUnlock()
214
[652]215 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]216 defer cancel()
217
[607]218 var stats DatabaseStats
[645]219 row := db.db.QueryRowContext(ctx, `SELECT
[607]220 (SELECT COUNT(*) FROM User) AS users,
221 (SELECT COUNT(*) FROM Network) AS networks,
222 (SELECT COUNT(*) FROM Channel) AS channels`)
223 if err := row.Scan(&stats.Users, &stats.Networks, &stats.Channels); err != nil {
224 return nil, err
225 }
226
227 return &stats, nil
228}
229
[531]230func toNullString(s string) sql.NullString {
231 return sql.NullString{
232 String: s,
233 Valid: s != "",
234 }
235}
236
[652]237func (db *SqliteDB) ListUsers(ctx context.Context) ([]User, error) {
[531]238 db.lock.RLock()
239 defer db.lock.RUnlock()
240
[652]241 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]242 defer cancel()
243
244 rows, err := db.db.QueryContext(ctx,
245 "SELECT id, username, password, admin, realname FROM User")
[531]246 if err != nil {
247 return nil, err
248 }
249 defer rows.Close()
250
251 var users []User
252 for rows.Next() {
253 var user User
[598]254 var password, realname sql.NullString
255 if err := rows.Scan(&user.ID, &user.Username, &password, &user.Admin, &realname); err != nil {
[531]256 return nil, err
257 }
258 user.Password = password.String
[598]259 user.Realname = realname.String
[531]260 users = append(users, user)
261 }
262 if err := rows.Err(); err != nil {
263 return nil, err
264 }
265
266 return users, nil
267}
268
[652]269func (db *SqliteDB) GetUser(ctx context.Context, username string) (*User, error) {
[531]270 db.lock.RLock()
271 defer db.lock.RUnlock()
272
[652]273 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]274 defer cancel()
275
[531]276 user := &User{Username: username}
277
[568]278 var password, realname sql.NullString
[645]279 row := db.db.QueryRowContext(ctx,
280 "SELECT id, password, admin, realname FROM User WHERE username = ?",
281 username)
[568]282 if err := row.Scan(&user.ID, &password, &user.Admin, &realname); err != nil {
[531]283 return nil, err
284 }
285 user.Password = password.String
[568]286 user.Realname = realname.String
[531]287 return user, nil
288}
289
[652]290func (db *SqliteDB) StoreUser(ctx context.Context, user *User) error {
[531]291 db.lock.Lock()
292 defer db.lock.Unlock()
293
[652]294 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]295 defer cancel()
296
[596]297 args := []interface{}{
298 sql.Named("username", user.Username),
299 sql.Named("password", toNullString(user.Password)),
300 sql.Named("admin", user.Admin),
301 sql.Named("realname", toNullString(user.Realname)),
302 }
[531]303
304 var err error
305 if user.ID != 0 {
[645]306 _, err = db.db.ExecContext(ctx, `
307 UPDATE User SET password = :password, admin = :admin,
308 realname = :realname WHERE username = :username`,
309 args...)
[531]310 } else {
311 var res sql.Result
[645]312 res, err = db.db.ExecContext(ctx, `
313 INSERT INTO
314 User(username, password, admin, realname)
315 VALUES (:username, :password, :admin, :realname)`,
316 args...)
[531]317 if err != nil {
318 return err
319 }
320 user.ID, err = res.LastInsertId()
321 }
322
323 return err
324}
325
[652]326func (db *SqliteDB) DeleteUser(ctx context.Context, id int64) error {
[531]327 db.lock.Lock()
328 defer db.lock.Unlock()
329
[652]330 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]331 defer cancel()
332
[531]333 tx, err := db.db.Begin()
334 if err != nil {
335 return err
336 }
337 defer tx.Rollback()
338
[645]339 _, err = tx.ExecContext(ctx, `DELETE FROM DeliveryReceipt
[595]340 WHERE id IN (
341 SELECT DeliveryReceipt.id
342 FROM DeliveryReceipt
343 JOIN Network ON DeliveryReceipt.network = Network.id
344 WHERE Network.user = ?
345 )`, id)
346 if err != nil {
347 return err
348 }
349
[645]350 _, err = tx.ExecContext(ctx, `DELETE FROM Channel
[531]351 WHERE id IN (
352 SELECT Channel.id
353 FROM Channel
354 JOIN Network ON Channel.network = Network.id
355 WHERE Network.user = ?
356 )`, id)
357 if err != nil {
358 return err
359 }
360
[645]361 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE user = ?", id)
[531]362 if err != nil {
363 return err
364 }
365
[645]366 _, err = tx.ExecContext(ctx, "DELETE FROM User WHERE id = ?", id)
[531]367 if err != nil {
368 return err
369 }
370
371 return tx.Commit()
372}
373
[652]374func (db *SqliteDB) ListNetworks(ctx context.Context, userID int64) ([]Network, error) {
[531]375 db.lock.RLock()
376 defer db.lock.RUnlock()
377
[652]378 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]379 defer cancel()
380
381 rows, err := db.db.QueryContext(ctx, `
382 SELECT id, name, addr, nick, username, realname, pass,
[531]383 connect_commands, sasl_mechanism, sasl_plain_username, sasl_plain_password,
[542]384 sasl_external_cert, sasl_external_key, enabled
[531]385 FROM Network
386 WHERE user = ?`,
387 userID)
388 if err != nil {
389 return nil, err
390 }
391 defer rows.Close()
392
393 var networks []Network
394 for rows.Next() {
395 var net Network
396 var name, username, realname, pass, connectCommands sql.NullString
397 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
398 err := rows.Scan(&net.ID, &name, &net.Addr, &net.Nick, &username, &realname,
399 &pass, &connectCommands, &saslMechanism, &saslPlainUsername, &saslPlainPassword,
[542]400 &net.SASL.External.CertBlob, &net.SASL.External.PrivKeyBlob, &net.Enabled)
[531]401 if err != nil {
402 return nil, err
403 }
404 net.Name = name.String
405 net.Username = username.String
406 net.Realname = realname.String
407 net.Pass = pass.String
408 if connectCommands.Valid {
409 net.ConnectCommands = strings.Split(connectCommands.String, "\r\n")
410 }
411 net.SASL.Mechanism = saslMechanism.String
412 net.SASL.Plain.Username = saslPlainUsername.String
413 net.SASL.Plain.Password = saslPlainPassword.String
414 networks = append(networks, net)
415 }
416 if err := rows.Err(); err != nil {
417 return nil, err
418 }
419
420 return networks, nil
421}
422
[652]423func (db *SqliteDB) StoreNetwork(ctx context.Context, userID int64, network *Network) error {
[531]424 db.lock.Lock()
425 defer db.lock.Unlock()
426
[652]427 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]428 defer cancel()
429
[531]430 var saslMechanism, saslPlainUsername, saslPlainPassword sql.NullString
431 if network.SASL.Mechanism != "" {
432 saslMechanism = toNullString(network.SASL.Mechanism)
433 switch network.SASL.Mechanism {
434 case "PLAIN":
435 saslPlainUsername = toNullString(network.SASL.Plain.Username)
436 saslPlainPassword = toNullString(network.SASL.Plain.Password)
437 network.SASL.External.CertBlob = nil
438 network.SASL.External.PrivKeyBlob = nil
439 case "EXTERNAL":
440 // keep saslPlain* nil
441 default:
442 return fmt.Errorf("soju: cannot store network: unsupported SASL mechanism %q", network.SASL.Mechanism)
443 }
444 }
445
[596]446 args := []interface{}{
447 sql.Named("name", toNullString(network.Name)),
448 sql.Named("addr", network.Addr),
449 sql.Named("nick", network.Nick),
450 sql.Named("username", toNullString(network.Username)),
451 sql.Named("realname", toNullString(network.Realname)),
452 sql.Named("pass", toNullString(network.Pass)),
453 sql.Named("connect_commands", toNullString(strings.Join(network.ConnectCommands, "\r\n"))),
454 sql.Named("sasl_mechanism", saslMechanism),
455 sql.Named("sasl_plain_username", saslPlainUsername),
456 sql.Named("sasl_plain_password", saslPlainPassword),
457 sql.Named("sasl_external_cert", network.SASL.External.CertBlob),
458 sql.Named("sasl_external_key", network.SASL.External.PrivKeyBlob),
459 sql.Named("enabled", network.Enabled),
460
461 sql.Named("id", network.ID), // only for UPDATE
462 sql.Named("user", userID), // only for INSERT
463 }
464
[531]465 var err error
466 if network.ID != 0 {
[645]467 _, err = db.db.ExecContext(ctx, `
[596]468 UPDATE Network
469 SET name = :name, addr = :addr, nick = :nick, username = :username,
470 realname = :realname, pass = :pass, connect_commands = :connect_commands,
471 sasl_mechanism = :sasl_mechanism, sasl_plain_username = :sasl_plain_username, sasl_plain_password = :sasl_plain_password,
472 sasl_external_cert = :sasl_external_cert, sasl_external_key = :sasl_external_key,
473 enabled = :enabled
474 WHERE id = :id`, args...)
[531]475 } else {
476 var res sql.Result
[645]477 res, err = db.db.ExecContext(ctx, `
[596]478 INSERT INTO Network(user, name, addr, nick, username, realname, pass,
479 connect_commands, sasl_mechanism, sasl_plain_username,
[542]480 sasl_plain_password, sasl_external_cert, sasl_external_key, enabled)
[596]481 VALUES (:user, :name, :addr, :nick, :username, :realname, :pass,
482 :connect_commands, :sasl_mechanism, :sasl_plain_username,
483 :sasl_plain_password, :sasl_external_cert, :sasl_external_key, :enabled)`,
484 args...)
[531]485 if err != nil {
486 return err
487 }
488 network.ID, err = res.LastInsertId()
489 }
490 return err
491}
492
[652]493func (db *SqliteDB) DeleteNetwork(ctx context.Context, id int64) error {
[531]494 db.lock.Lock()
495 defer db.lock.Unlock()
496
[652]497 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]498 defer cancel()
499
[531]500 tx, err := db.db.Begin()
501 if err != nil {
502 return err
503 }
504 defer tx.Rollback()
505
[645]506 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ?", id)
[595]507 if err != nil {
508 return err
509 }
510
[645]511 _, err = tx.ExecContext(ctx, "DELETE FROM Channel WHERE network = ?", id)
[531]512 if err != nil {
513 return err
514 }
515
[645]516 _, err = tx.ExecContext(ctx, "DELETE FROM Network WHERE id = ?", id)
[531]517 if err != nil {
518 return err
519 }
520
521 return tx.Commit()
522}
523
[652]524func (db *SqliteDB) ListChannels(ctx context.Context, networkID int64) ([]Channel, error) {
[531]525 db.lock.RLock()
526 defer db.lock.RUnlock()
527
[652]528 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]529 defer cancel()
530
531 rows, err := db.db.QueryContext(ctx, `SELECT
[531]532 id, name, key, detached, detached_internal_msgid,
533 relay_detached, reattach_on, detach_after, detach_on
534 FROM Channel
535 WHERE network = ?`, networkID)
536 if err != nil {
537 return nil, err
538 }
539 defer rows.Close()
540
541 var channels []Channel
542 for rows.Next() {
543 var ch Channel
544 var key, detachedInternalMsgID sql.NullString
545 var detachAfter int64
546 if err := rows.Scan(&ch.ID, &ch.Name, &key, &ch.Detached, &detachedInternalMsgID, &ch.RelayDetached, &ch.ReattachOn, &detachAfter, &ch.DetachOn); err != nil {
547 return nil, err
548 }
549 ch.Key = key.String
550 ch.DetachedInternalMsgID = detachedInternalMsgID.String
551 ch.DetachAfter = time.Duration(detachAfter) * time.Second
552 channels = append(channels, ch)
553 }
554 if err := rows.Err(); err != nil {
555 return nil, err
556 }
557
558 return channels, nil
559}
560
[652]561func (db *SqliteDB) StoreChannel(ctx context.Context, networkID int64, ch *Channel) error {
[531]562 db.lock.Lock()
563 defer db.lock.Unlock()
564
[652]565 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]566 defer cancel()
567
[596]568 args := []interface{}{
569 sql.Named("network", networkID),
570 sql.Named("name", ch.Name),
571 sql.Named("key", toNullString(ch.Key)),
572 sql.Named("detached", ch.Detached),
573 sql.Named("detached_internal_msgid", toNullString(ch.DetachedInternalMsgID)),
574 sql.Named("relay_detached", ch.RelayDetached),
575 sql.Named("reattach_on", ch.ReattachOn),
576 sql.Named("detach_after", int64(math.Ceil(ch.DetachAfter.Seconds()))),
577 sql.Named("detach_on", ch.DetachOn),
[531]578
[596]579 sql.Named("id", ch.ID), // only for UPDATE
580 }
581
[531]582 var err error
583 if ch.ID != 0 {
[645]584 _, err = db.db.ExecContext(ctx, `UPDATE Channel
[596]585 SET network = :network, name = :name, key = :key, detached = :detached,
586 detached_internal_msgid = :detached_internal_msgid, relay_detached = :relay_detached,
587 reattach_on = :reattach_on, detach_after = :detach_after, detach_on = :detach_on
588 WHERE id = :id`, args...)
[531]589 } else {
590 var res sql.Result
[645]591 res, err = db.db.ExecContext(ctx, `INSERT INTO Channel(network, name, key, detached, detached_internal_msgid, relay_detached, reattach_on, detach_after, detach_on)
[596]592 VALUES (:network, :name, :key, :detached, :detached_internal_msgid, :relay_detached, :reattach_on, :detach_after, :detach_on)`, args...)
[531]593 if err != nil {
594 return err
595 }
596 ch.ID, err = res.LastInsertId()
597 }
598 return err
599}
600
[652]601func (db *SqliteDB) DeleteChannel(ctx context.Context, id int64) error {
[531]602 db.lock.Lock()
603 defer db.lock.Unlock()
604
[652]605 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]606 defer cancel()
607
608 _, err := db.db.ExecContext(ctx, "DELETE FROM Channel WHERE id = ?", id)
[531]609 return err
610}
611
[652]612func (db *SqliteDB) ListDeliveryReceipts(ctx context.Context, networkID int64) ([]DeliveryReceipt, error) {
[531]613 db.lock.RLock()
614 defer db.lock.RUnlock()
615
[652]616 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]617 defer cancel()
618
619 rows, err := db.db.QueryContext(ctx, `
620 SELECT id, target, client, internal_msgid
[531]621 FROM DeliveryReceipt
622 WHERE network = ?`, networkID)
623 if err != nil {
624 return nil, err
625 }
626 defer rows.Close()
627
628 var receipts []DeliveryReceipt
629 for rows.Next() {
630 var rcpt DeliveryReceipt
631 var client sql.NullString
632 if err := rows.Scan(&rcpt.ID, &rcpt.Target, &client, &rcpt.InternalMsgID); err != nil {
633 return nil, err
634 }
635 rcpt.Client = client.String
636 receipts = append(receipts, rcpt)
637 }
638 if err := rows.Err(); err != nil {
639 return nil, err
640 }
641
642 return receipts, nil
643}
644
[652]645func (db *SqliteDB) StoreClientDeliveryReceipts(ctx context.Context, networkID int64, client string, receipts []DeliveryReceipt) error {
[531]646 db.lock.Lock()
647 defer db.lock.Unlock()
648
[652]649 ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
[645]650 defer cancel()
651
[531]652 tx, err := db.db.Begin()
653 if err != nil {
654 return err
655 }
656 defer tx.Rollback()
657
[645]658 _, err = tx.ExecContext(ctx, "DELETE FROM DeliveryReceipt WHERE network = ? AND client IS ?",
[531]659 networkID, toNullString(client))
660 if err != nil {
661 return err
662 }
663
664 for i := range receipts {
665 rcpt := &receipts[i]
666
[645]667 res, err := tx.ExecContext(ctx, `
668 INSERT INTO DeliveryReceipt(network, target, client, internal_msgid)
[596]669 VALUES (:network, :target, :client, :internal_msgid)`,
670 sql.Named("network", networkID),
671 sql.Named("target", rcpt.Target),
672 sql.Named("client", toNullString(client)),
673 sql.Named("internal_msgid", rcpt.InternalMsgID))
[531]674 if err != nil {
675 return err
676 }
677 rcpt.ID, err = res.LastInsertId()
678 if err != nil {
679 return err
680 }
681 }
682
683 return tx.Commit()
684}
Note: See TracBrowser for help on using the repository browser.