Legend:
- Unmodified
- Added
- Removed
-
trunk/db.go
r457 r489 119 119 DetachAfter time.Duration 120 120 DetachOn MessageFilter 121 } 122 123 type DeliveryReceipt struct { 124 ID int64 125 Target string // channel or nick 126 Client string 127 InternalMsgID string 121 128 } 122 129 … … 161 168 FOREIGN KEY(network) REFERENCES Network(id), 162 169 UNIQUE(network, name) 170 ); 171 172 CREATE TABLE DeliveryReceipt ( 173 id INTEGER PRIMARY KEY, 174 network INTEGER NOT NULL, 175 target VARCHAR(255) NOT NULL, 176 client VARCHAR(255), 177 internal_msgid VARCHAR(255) NOT NULL, 178 FOREIGN KEY(network) REFERENCES Network(id), 179 UNIQUE(network, target, client) 163 180 ); 164 181 ` … … 218 235 ALTER TABLE Channel ADD COLUMN detach_on INTEGER NOT NULL DEFAULT 0; 219 236 `, 237 ` 238 CREATE TABLE DeliveryReceipt ( 239 id INTEGER PRIMARY KEY, 240 network INTEGER NOT NULL, 241 target VARCHAR(255) NOT NULL, 242 client VARCHAR(255), 243 internal_msgid VARCHAR(255) NOT NULL, 244 FOREIGN KEY(network) REFERENCES Network(id), 245 UNIQUE(network, target, client) 246 ); 247 `, 220 248 } 221 249 … … 579 607 return err 580 608 } 609 610 func (db *DB) ListDeliveryReceipts(networkID int64) ([]DeliveryReceipt, error) { 611 db.lock.RLock() 612 defer db.lock.RUnlock() 613 614 rows, err := db.db.Query(`SELECT id, target, client, internal_msgid 615 FROM DeliveryReceipt 616 WHERE network = ?`, networkID) 617 if err != nil { 618 return nil, err 619 } 620 defer rows.Close() 621 622 var receipts []DeliveryReceipt 623 for rows.Next() { 624 var rcpt DeliveryReceipt 625 var client sql.NullString 626 if err := rows.Scan(&rcpt.ID, &rcpt.Target, &client, &rcpt.InternalMsgID); err != nil { 627 return nil, err 628 } 629 rcpt.Client = client.String 630 receipts = append(receipts, rcpt) 631 } 632 if err := rows.Err(); err != nil { 633 return nil, err 634 } 635 636 return receipts, nil 637 } 638 639 func (db *DB) StoreClientDeliveryReceipts(networkID int64, client string, receipts []DeliveryReceipt) error { 640 db.lock.Lock() 641 defer db.lock.Unlock() 642 643 tx, err := db.db.Begin() 644 if err != nil { 645 return err 646 } 647 defer tx.Rollback() 648 649 _, err = tx.Exec("DELETE FROM DeliveryReceipt WHERE network = ? AND client = ?", 650 networkID, toNullString(client)) 651 if err != nil { 652 return err 653 } 654 655 for i := range receipts { 656 rcpt := &receipts[i] 657 658 res, err := tx.Exec("INSERT INTO DeliveryReceipt(network, target, client, internal_msgid) VALUES (?, ?, ?, ?)", 659 networkID, rcpt.Target, toNullString(client), rcpt.InternalMsgID) 660 if err != nil { 661 return err 662 } 663 rcpt.ID, err = res.LastInsertId() 664 if err != nil { 665 return err 666 } 667 } 668 669 return tx.Commit() 670 } -
trunk/upstream.go
r487 r489 1753 1753 } 1754 1754 1755 for clientName, _ := range uc.user.clientNames{1755 uc.network.delivered.ForEachClient(func(clientName string) { 1756 1756 uc.network.delivered.StoreID(entity, clientName, lastID) 1757 } 1757 }) 1758 1758 } 1759 1759 -
trunk/user.go
r487 r489 90 90 for _, entry := range ds.m.innerMap { 91 91 f(entry.originalKey) 92 } 93 } 94 95 func (ds deliveredStore) ForEachClient(f func(clientName string)) { 96 clients := make(map[string]struct{}) 97 for _, entry := range ds.m.innerMap { 98 delivered := entry.value.(deliveredClientMap) 99 for clientName := range delivered { 100 clients[clientName] = struct{}{} 101 } 102 } 103 104 for clientName := range clients { 105 f(clientName) 92 106 } 93 107 } … … 299 313 } 300 314 315 func (net *network) storeClientDeliveryReceipts(clientName string) { 316 if !net.user.hasPersistentMsgStore() { 317 return 318 } 319 320 var receipts []DeliveryReceipt 321 net.delivered.ForEachTarget(func(target string) { 322 msgID := net.delivered.LoadID(target, clientName) 323 if msgID == "" { 324 return 325 } 326 receipts = append(receipts, DeliveryReceipt{ 327 Target: target, 328 InternalMsgID: msgID, 329 }) 330 }) 331 332 if err := net.user.srv.db.StoreClientDeliveryReceipts(net.ID, clientName, receipts); err != nil { 333 net.user.srv.Logger.Printf("failed to store delivery receipts for user %q, client %q, network %q: %v", net.user.Username, clientName, net.GetName(), err) 334 } 335 } 336 301 337 type user struct { 302 338 User … … 309 345 downstreamConns []*downstreamConn 310 346 msgStore messageStore 311 clientNames map[string]struct{}312 347 313 348 // LIST commands in progress … … 330 365 331 366 return &user{ 332 User: *record, 333 srv: srv, 334 events: make(chan event, 64), 335 done: make(chan struct{}), 336 msgStore: msgStore, 337 clientNames: make(map[string]struct{}), 367 User: *record, 368 srv: srv, 369 events: make(chan event, 64), 370 done: make(chan struct{}), 371 msgStore: msgStore, 338 372 } 339 373 } … … 407 441 network := newNetwork(u, &record, channels) 408 442 u.networks = append(u.networks, network) 443 444 if u.hasPersistentMsgStore() { 445 receipts, err := u.srv.db.ListDeliveryReceipts(record.ID) 446 if err != nil { 447 u.srv.Logger.Printf("failed to load delivery receipts for user %q, network %q: %v", u.Username, network.GetName(), err) 448 return 449 } 450 451 for _, rcpt := range receipts { 452 network.delivered.StoreID(rcpt.Target, rcpt.Client, rcpt.InternalMsgID) 453 } 454 } 409 455 410 456 go network.run() … … 490 536 uc.updateAway() 491 537 }) 492 493 u.clientNames[dc.clientName] = struct{}{}494 538 case eventDownstreamDisconnected: 495 539 dc := e.dc … … 501 545 } 502 546 } 547 548 dc.forEachNetwork(func(net *network) { 549 net.storeClientDeliveryReceipts(dc.clientName) 550 }) 503 551 504 552 u.forEachUpstream(func(uc *upstreamConn) { … … 525 573 for _, n := range u.networks { 526 574 n.stop() 575 576 n.delivered.ForEachClient(func(clientName string) { 577 n.storeClientDeliveryReceipts(clientName) 578 }) 527 579 } 528 580 return … … 666 718 <-u.done 667 719 } 720 721 func (u *user) hasPersistentMsgStore() bool { 722 if u.msgStore == nil { 723 return false 724 } 725 _, isMem := u.msgStore.(*memoryMessageStore) 726 return !isMem 727 }
Note:
See TracChangeset
for help on using the changeset viewer.