source: code/trunk/msgstore_fs.go@ 665

Last change on this file since 665 was 665, checked in by delthas, 4 years ago

Add support for draft/event-playback

File size: 17.5 KB
Line 
1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "sort"
10 "strings"
11 "time"
12
13 "git.sr.ht/~sircmpwn/go-bare"
14 "gopkg.in/irc.v3"
15)
16
17const (
18 fsMessageStoreMaxFiles = 20
19 fsMessageStoreMaxTries = 100
20)
21
22func escapeFilename(unsafe string) (safe string) {
23 if unsafe == "." {
24 return "-"
25 } else if unsafe == ".." {
26 return "--"
27 } else {
28 return strings.NewReplacer("/", "-", "\\", "-").Replace(unsafe)
29 }
30}
31
32type date struct {
33 Year, Month, Day int
34}
35
36func newDate(t time.Time) date {
37 year, month, day := t.Date()
38 return date{year, int(month), day}
39}
40
41func (d date) Time() time.Time {
42 return time.Date(d.Year, time.Month(d.Month), d.Day, 0, 0, 0, 0, time.Local)
43}
44
45type fsMsgID struct {
46 Date date
47 Offset bare.Int
48}
49
50func (fsMsgID) msgIDType() msgIDType {
51 return msgIDFS
52}
53
54func parseFSMsgID(s string) (netID int64, entity string, t time.Time, offset int64, err error) {
55 var id fsMsgID
56 netID, entity, err = parseMsgID(s, &id)
57 if err != nil {
58 return 0, "", time.Time{}, 0, err
59 }
60 return netID, entity, id.Date.Time(), int64(id.Offset), nil
61}
62
63func formatFSMsgID(netID int64, entity string, t time.Time, offset int64) string {
64 id := fsMsgID{
65 Date: newDate(t),
66 Offset: bare.Int(offset),
67 }
68 return formatMsgID(netID, entity, &id)
69}
70
71type fsMessageStoreFile struct {
72 *os.File
73 lastUse time.Time
74}
75
76// fsMessageStore is a per-user on-disk store for IRC messages.
77//
78// It mimicks the ZNC log layout and format. See the ZNC source:
79// https://github.com/znc/znc/blob/master/modules/log.cpp
80type fsMessageStore struct {
81 root string
82
83 // Write-only files used by Append
84 files map[string]*fsMessageStoreFile // indexed by entity
85}
86
87var _ messageStore = (*fsMessageStore)(nil)
88var _ chatHistoryMessageStore = (*fsMessageStore)(nil)
89
90func newFSMessageStore(root, username string) *fsMessageStore {
91 return &fsMessageStore{
92 root: filepath.Join(root, escapeFilename(username)),
93 files: make(map[string]*fsMessageStoreFile),
94 }
95}
96
97func (ms *fsMessageStore) logPath(network *network, entity string, t time.Time) string {
98 year, month, day := t.Date()
99 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
100 return filepath.Join(ms.root, escapeFilename(network.GetName()), escapeFilename(entity), filename)
101}
102
103// nextMsgID queries the message ID for the next message to be written to f.
104func nextFSMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
105 offset, err := f.Seek(0, io.SeekEnd)
106 if err != nil {
107 return "", fmt.Errorf("failed to query next FS message ID: %v", err)
108 }
109 return formatFSMsgID(network.ID, entity, t, offset), nil
110}
111
112func (ms *fsMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
113 p := ms.logPath(network, entity, t)
114 fi, err := os.Stat(p)
115 if os.IsNotExist(err) {
116 return formatFSMsgID(network.ID, entity, t, -1), nil
117 } else if err != nil {
118 return "", fmt.Errorf("failed to query last FS message ID: %v", err)
119 }
120 return formatFSMsgID(network.ID, entity, t, fi.Size()-1), nil
121}
122
123func (ms *fsMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
124 s := formatMessage(msg)
125 if s == "" {
126 return "", nil
127 }
128
129 var t time.Time
130 if tag, ok := msg.Tags["time"]; ok {
131 var err error
132 t, err = time.Parse(serverTimeLayout, string(tag))
133 if err != nil {
134 return "", fmt.Errorf("failed to parse message time tag: %v", err)
135 }
136 t = t.In(time.Local)
137 } else {
138 t = time.Now()
139 }
140
141 f := ms.files[entity]
142
143 // TODO: handle non-monotonic clock behaviour
144 path := ms.logPath(network, entity, t)
145 if f == nil || f.Name() != path {
146 dir := filepath.Dir(path)
147 if err := os.MkdirAll(dir, 0750); err != nil {
148 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
149 }
150
151 ff, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0640)
152 if err != nil {
153 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
154 }
155
156 if f != nil {
157 f.Close()
158 }
159 f = &fsMessageStoreFile{File: ff}
160 ms.files[entity] = f
161 }
162
163 f.lastUse = time.Now()
164
165 if len(ms.files) > fsMessageStoreMaxFiles {
166 entities := make([]string, 0, len(ms.files))
167 for name := range ms.files {
168 entities = append(entities, name)
169 }
170 sort.Slice(entities, func(i, j int) bool {
171 a, b := entities[i], entities[j]
172 return ms.files[a].lastUse.Before(ms.files[b].lastUse)
173 })
174 entities = entities[0 : len(entities)-fsMessageStoreMaxFiles]
175 for _, name := range entities {
176 ms.files[name].Close()
177 delete(ms.files, name)
178 }
179 }
180
181 msgID, err := nextFSMsgID(network, entity, t, f.File)
182 if err != nil {
183 return "", fmt.Errorf("failed to generate message ID: %v", err)
184 }
185
186 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
187 if err != nil {
188 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
189 }
190
191 return msgID, nil
192}
193
194func (ms *fsMessageStore) Close() error {
195 var closeErr error
196 for _, f := range ms.files {
197 if err := f.Close(); err != nil {
198 closeErr = fmt.Errorf("failed to close message store: %v", err)
199 }
200 }
201 return closeErr
202}
203
204// formatMessage formats a message log line. It assumes a well-formed IRC
205// message.
206func formatMessage(msg *irc.Message) string {
207 switch strings.ToUpper(msg.Command) {
208 case "NICK":
209 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
210 case "JOIN":
211 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
212 case "PART":
213 var reason string
214 if len(msg.Params) > 1 {
215 reason = msg.Params[1]
216 }
217 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
218 case "KICK":
219 nick := msg.Params[1]
220 var reason string
221 if len(msg.Params) > 2 {
222 reason = msg.Params[2]
223 }
224 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
225 case "QUIT":
226 var reason string
227 if len(msg.Params) > 0 {
228 reason = msg.Params[0]
229 }
230 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
231 case "TOPIC":
232 var topic string
233 if len(msg.Params) > 1 {
234 topic = msg.Params[1]
235 }
236 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
237 case "MODE":
238 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
239 case "NOTICE":
240 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
241 case "PRIVMSG":
242 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
243 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
244 } else {
245 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
246 }
247 default:
248 return ""
249 }
250}
251
252func parseMessage(line, entity string, ref time.Time, events bool) (*irc.Message, time.Time, error) {
253 var hour, minute, second int
254 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
255 if err != nil {
256 return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: %v", err)
257 }
258 line = line[11:]
259
260 var cmd string
261 var prefix *irc.Prefix
262 var params []string
263 if events && strings.HasPrefix(line, "*** ") {
264 parts := strings.SplitN(line[4:], " ", 2)
265 if len(parts) != 2 {
266 return nil, time.Time{}, nil
267 }
268 switch parts[0] {
269 case "Joins:", "Parts:", "Quits:":
270 args := strings.SplitN(parts[1], " ", 3)
271 if len(args) < 2 {
272 return nil, time.Time{}, nil
273 }
274 nick := args[0]
275 mask := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
276 maskParts := strings.SplitN(mask, "@", 2)
277 if len(maskParts) != 2 {
278 return nil, time.Time{}, nil
279 }
280 prefix = &irc.Prefix{
281 Name: nick,
282 User: maskParts[0],
283 Host: maskParts[1],
284 }
285 var reason string
286 if len(args) > 2 {
287 reason = strings.TrimSuffix(strings.TrimPrefix(args[2], "("), ")")
288 }
289 switch parts[0] {
290 case "Joins:":
291 cmd = "JOIN"
292 params = []string{entity}
293 case "Parts:":
294 cmd = "PART"
295 if reason != "" {
296 params = []string{entity, reason}
297 } else {
298 params = []string{entity}
299 }
300 case "Quits:":
301 cmd = "QUIT"
302 if reason != "" {
303 params = []string{reason}
304 }
305 }
306 default:
307 nick := parts[0]
308 rem := parts[1]
309 if r := strings.TrimPrefix(rem, "is now known as "); r != rem {
310 cmd = "NICK"
311 prefix = &irc.Prefix{
312 Name: nick,
313 }
314 params = []string{r}
315 } else if r := strings.TrimPrefix(rem, "was kicked by "); r != rem {
316 args := strings.SplitN(r, " ", 2)
317 if len(args) != 2 {
318 return nil, time.Time{}, nil
319 }
320 cmd = "KICK"
321 prefix = &irc.Prefix{
322 Name: args[0],
323 }
324 reason := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
325 params = []string{entity, nick}
326 if reason != "" {
327 params = append(params, reason)
328 }
329 } else if r := strings.TrimPrefix(rem, "changes topic to "); r != rem {
330 cmd = "TOPIC"
331 prefix = &irc.Prefix{
332 Name: nick,
333 }
334 topic := strings.TrimSuffix(strings.TrimPrefix(r, "'"), "'")
335 params = []string{entity, topic}
336 } else if r := strings.TrimPrefix(rem, "sets mode: "); r != rem {
337 cmd = "MODE"
338 prefix = &irc.Prefix{
339 Name: nick,
340 }
341 params = append([]string{entity}, strings.Split(r, " ")...)
342 } else {
343 return nil, time.Time{}, nil
344 }
345 }
346 } else {
347 var sender, text string
348 if strings.HasPrefix(line, "<") {
349 cmd = "PRIVMSG"
350 parts := strings.SplitN(line[1:], "> ", 2)
351 if len(parts) != 2 {
352 return nil, time.Time{}, nil
353 }
354 sender, text = parts[0], parts[1]
355 } else if strings.HasPrefix(line, "-") {
356 cmd = "NOTICE"
357 parts := strings.SplitN(line[1:], "- ", 2)
358 if len(parts) != 2 {
359 return nil, time.Time{}, nil
360 }
361 sender, text = parts[0], parts[1]
362 } else if strings.HasPrefix(line, "* ") {
363 cmd = "PRIVMSG"
364 parts := strings.SplitN(line[2:], " ", 2)
365 if len(parts) != 2 {
366 return nil, time.Time{}, nil
367 }
368 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
369 } else {
370 return nil, time.Time{}, nil
371 }
372
373 prefix = &irc.Prefix{Name: sender}
374 params = []string{entity, text}
375 }
376
377 year, month, day := ref.Date()
378 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
379
380 msg := &irc.Message{
381 Tags: map[string]irc.TagValue{
382 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
383 },
384 Prefix: prefix,
385 Command: cmd,
386 Params: params,
387 }
388 return msg, t, nil
389}
390
391func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, events bool, limit int, afterOffset int64) ([]*irc.Message, error) {
392 path := ms.logPath(network, entity, ref)
393 f, err := os.Open(path)
394 if err != nil {
395 if os.IsNotExist(err) {
396 return nil, nil
397 }
398 return nil, fmt.Errorf("failed to parse messages before ref: %v", err)
399 }
400 defer f.Close()
401
402 historyRing := make([]*irc.Message, limit)
403 cur := 0
404
405 sc := bufio.NewScanner(f)
406
407 if afterOffset >= 0 {
408 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
409 return nil, nil
410 }
411 sc.Scan() // skip till next newline
412 }
413
414 for sc.Scan() {
415 msg, t, err := parseMessage(sc.Text(), entity, ref, events)
416 if err != nil {
417 return nil, err
418 } else if msg == nil || !t.After(end) {
419 continue
420 } else if !t.Before(ref) {
421 break
422 }
423
424 historyRing[cur%limit] = msg
425 cur++
426 }
427 if sc.Err() != nil {
428 return nil, fmt.Errorf("failed to parse messages before ref: scanner error: %v", sc.Err())
429 }
430
431 n := limit
432 if cur < limit {
433 n = cur
434 }
435 start := (cur - n + limit) % limit
436
437 if start+n <= limit { // ring doesnt wrap
438 return historyRing[start : start+n], nil
439 } else { // ring wraps
440 history := make([]*irc.Message, n)
441 r := copy(history, historyRing[start:])
442 copy(history[r:], historyRing[:n-r])
443 return history, nil
444 }
445}
446
447func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, events bool, limit int) ([]*irc.Message, error) {
448 path := ms.logPath(network, entity, ref)
449 f, err := os.Open(path)
450 if err != nil {
451 if os.IsNotExist(err) {
452 return nil, nil
453 }
454 return nil, fmt.Errorf("failed to parse messages after ref: %v", err)
455 }
456 defer f.Close()
457
458 var history []*irc.Message
459 sc := bufio.NewScanner(f)
460 for sc.Scan() && len(history) < limit {
461 msg, t, err := parseMessage(sc.Text(), entity, ref, events)
462 if err != nil {
463 return nil, err
464 } else if msg == nil || !t.After(ref) {
465 continue
466 } else if !t.Before(end) {
467 break
468 }
469
470 history = append(history, msg)
471 }
472 if sc.Err() != nil {
473 return nil, fmt.Errorf("failed to parse messages after ref: scanner error: %v", sc.Err())
474 }
475
476 return history, nil
477}
478
479func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
480 start = start.In(time.Local)
481 end = end.In(time.Local)
482 history := make([]*irc.Message, limit)
483 remaining := limit
484 tries := 0
485 for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) {
486 buf, err := ms.parseMessagesBefore(network, entity, start, end, events, remaining, -1)
487 if err != nil {
488 return nil, err
489 }
490 if len(buf) == 0 {
491 tries++
492 } else {
493 tries = 0
494 }
495 copy(history[remaining-len(buf):], buf)
496 remaining -= len(buf)
497 year, month, day := start.Date()
498 start = time.Date(year, month, day, 0, 0, 0, 0, start.Location()).Add(-1)
499 }
500
501 return history[remaining:], nil
502}
503
504func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
505 start = start.In(time.Local)
506 end = end.In(time.Local)
507 var history []*irc.Message
508 remaining := limit
509 tries := 0
510 for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) {
511 buf, err := ms.parseMessagesAfter(network, entity, start, end, events, remaining)
512 if err != nil {
513 return nil, err
514 }
515 if len(buf) == 0 {
516 tries++
517 } else {
518 tries = 0
519 }
520 history = append(history, buf...)
521 remaining -= len(buf)
522 year, month, day := start.Date()
523 start = time.Date(year, month, day+1, 0, 0, 0, 0, start.Location())
524 }
525 return history, nil
526}
527
528func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
529 var afterTime time.Time
530 var afterOffset int64
531 if id != "" {
532 var idNet int64
533 var idEntity string
534 var err error
535 idNet, idEntity, afterTime, afterOffset, err = parseFSMsgID(id)
536 if err != nil {
537 return nil, err
538 }
539 if idNet != network.ID || idEntity != entity {
540 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
541 }
542 }
543
544 history := make([]*irc.Message, limit)
545 t := time.Now()
546 remaining := limit
547 tries := 0
548 for remaining > 0 && tries < fsMessageStoreMaxTries && !truncateDay(t).Before(afterTime) {
549 var offset int64 = -1
550 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
551 offset = afterOffset
552 }
553
554 buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, false, remaining, offset)
555 if err != nil {
556 return nil, err
557 }
558 if len(buf) == 0 {
559 tries++
560 } else {
561 tries = 0
562 }
563 copy(history[remaining-len(buf):], buf)
564 remaining -= len(buf)
565 year, month, day := t.Date()
566 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
567 }
568
569 return history[remaining:], nil
570}
571
572func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error) {
573 start = start.In(time.Local)
574 end = end.In(time.Local)
575 rootPath := filepath.Join(ms.root, escapeFilename(network.GetName()))
576 root, err := os.Open(rootPath)
577 if os.IsNotExist(err) {
578 return nil, nil
579 } else if err != nil {
580 return nil, err
581 }
582
583 // The returned targets are escaped, and there is no way to un-escape
584 // TODO: switch to ReadDir (Go 1.16+)
585 targetNames, err := root.Readdirnames(0)
586 root.Close()
587 if err != nil {
588 return nil, err
589 }
590
591 var targets []chatHistoryTarget
592 for _, target := range targetNames {
593 // target is already escaped here
594 targetPath := filepath.Join(rootPath, target)
595 targetDir, err := os.Open(targetPath)
596 if err != nil {
597 return nil, err
598 }
599
600 entries, err := targetDir.Readdir(0)
601 targetDir.Close()
602 if err != nil {
603 return nil, err
604 }
605
606 // We use mtime here, which may give imprecise or incorrect results
607 var t time.Time
608 for _, entry := range entries {
609 if entry.ModTime().After(t) {
610 t = entry.ModTime()
611 }
612 }
613
614 // The timestamps we get from logs have second granularity
615 t = truncateSecond(t)
616
617 // Filter out targets that don't fullfil the time bounds
618 if !isTimeBetween(t, start, end) {
619 continue
620 }
621
622 targets = append(targets, chatHistoryTarget{
623 Name: target,
624 LatestMessage: t,
625 })
626 }
627
628 // Sort targets by latest message time, backwards or forwards depending on
629 // the order of the time bounds
630 sort.Slice(targets, func(i, j int) bool {
631 t1, t2 := targets[i].LatestMessage, targets[j].LatestMessage
632 if start.Before(end) {
633 return t1.Before(t2)
634 } else {
635 return !t1.Before(t2)
636 }
637 })
638
639 // Truncate the result if necessary
640 if len(targets) > limit {
641 targets = targets[:limit]
642 }
643
644 return targets, nil
645}
646
647func (ms *fsMessageStore) RenameNetwork(oldNet, newNet *network) error {
648 oldDir := filepath.Join(ms.root, escapeFilename(oldNet.GetName()))
649 newDir := filepath.Join(ms.root, escapeFilename(newNet.GetName()))
650 // Avoid loosing data by overwriting an existing directory
651 if _, err := os.Stat(newDir); err == nil {
652 return fmt.Errorf("destination %q already exists", newDir)
653 }
654 return os.Rename(oldDir, newDir)
655}
656
657func truncateDay(t time.Time) time.Time {
658 year, month, day := t.Date()
659 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
660}
661
662func truncateSecond(t time.Time) time.Time {
663 year, month, day := t.Date()
664 return time.Date(year, month, day, t.Hour(), t.Minute(), t.Second(), 0, t.Location())
665}
666
667func isTimeBetween(t, start, end time.Time) bool {
668 if end.Before(start) {
669 end, start = start, end
670 }
671 return start.Before(t) && t.Before(end)
672}
Note: See TracBrowser for help on using the repository browser.