source: code/trunk/msgstore_fs.go@ 666

Last change on this file since 666 was 666, checked in by contact, 4 years ago

msgstore: take Network as arg instead of network

The message stores don't need to access the internal network
struct, they just need network metadata such as ID and name.

This can ease moving message stores into a separate package in the
future.

File size: 17.5 KB
Line 
1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "sort"
10 "strings"
11 "time"
12
13 "git.sr.ht/~sircmpwn/go-bare"
14 "gopkg.in/irc.v3"
15)
16
17const (
18 fsMessageStoreMaxFiles = 20
19 fsMessageStoreMaxTries = 100
20)
21
22func escapeFilename(unsafe string) (safe string) {
23 if unsafe == "." {
24 return "-"
25 } else if unsafe == ".." {
26 return "--"
27 } else {
28 return strings.NewReplacer("/", "-", "\\", "-").Replace(unsafe)
29 }
30}
31
32type date struct {
33 Year, Month, Day int
34}
35
36func newDate(t time.Time) date {
37 year, month, day := t.Date()
38 return date{year, int(month), day}
39}
40
41func (d date) Time() time.Time {
42 return time.Date(d.Year, time.Month(d.Month), d.Day, 0, 0, 0, 0, time.Local)
43}
44
45type fsMsgID struct {
46 Date date
47 Offset bare.Int
48}
49
50func (fsMsgID) msgIDType() msgIDType {
51 return msgIDFS
52}
53
54func parseFSMsgID(s string) (netID int64, entity string, t time.Time, offset int64, err error) {
55 var id fsMsgID
56 netID, entity, err = parseMsgID(s, &id)
57 if err != nil {
58 return 0, "", time.Time{}, 0, err
59 }
60 return netID, entity, id.Date.Time(), int64(id.Offset), nil
61}
62
63func formatFSMsgID(netID int64, entity string, t time.Time, offset int64) string {
64 id := fsMsgID{
65 Date: newDate(t),
66 Offset: bare.Int(offset),
67 }
68 return formatMsgID(netID, entity, &id)
69}
70
71type fsMessageStoreFile struct {
72 *os.File
73 lastUse time.Time
74}
75
76// fsMessageStore is a per-user on-disk store for IRC messages.
77//
78// It mimicks the ZNC log layout and format. See the ZNC source:
79// https://github.com/znc/znc/blob/master/modules/log.cpp
80type fsMessageStore struct {
81 root string
82
83 // Write-only files used by Append
84 files map[string]*fsMessageStoreFile // indexed by entity
85}
86
87var _ messageStore = (*fsMessageStore)(nil)
88var _ chatHistoryMessageStore = (*fsMessageStore)(nil)
89
90func newFSMessageStore(root, username string) *fsMessageStore {
91 return &fsMessageStore{
92 root: filepath.Join(root, escapeFilename(username)),
93 files: make(map[string]*fsMessageStoreFile),
94 }
95}
96
97func (ms *fsMessageStore) logPath(network *Network, entity string, t time.Time) string {
98 year, month, day := t.Date()
99 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
100 return filepath.Join(ms.root, escapeFilename(network.GetName()), escapeFilename(entity), filename)
101}
102
103// nextMsgID queries the message ID for the next message to be written to f.
104func nextFSMsgID(network *Network, entity string, t time.Time, f *os.File) (string, error) {
105 offset, err := f.Seek(0, io.SeekEnd)
106 if err != nil {
107 return "", fmt.Errorf("failed to query next FS message ID: %v", err)
108 }
109 return formatFSMsgID(network.ID, entity, t, offset), nil
110}
111
112func (ms *fsMessageStore) LastMsgID(network *Network, entity string, t time.Time) (string, error) {
113 p := ms.logPath(network, entity, t)
114 fi, err := os.Stat(p)
115 if os.IsNotExist(err) {
116 return formatFSMsgID(network.ID, entity, t, -1), nil
117 } else if err != nil {
118 return "", fmt.Errorf("failed to query last FS message ID: %v", err)
119 }
120 return formatFSMsgID(network.ID, entity, t, fi.Size()-1), nil
121}
122
123func (ms *fsMessageStore) Append(network *Network, entity string, msg *irc.Message) (string, error) {
124 s := formatMessage(msg)
125 if s == "" {
126 return "", nil
127 }
128
129 var t time.Time
130 if tag, ok := msg.Tags["time"]; ok {
131 var err error
132 t, err = time.Parse(serverTimeLayout, string(tag))
133 if err != nil {
134 return "", fmt.Errorf("failed to parse message time tag: %v", err)
135 }
136 t = t.In(time.Local)
137 } else {
138 t = time.Now()
139 }
140
141 f := ms.files[entity]
142
143 // TODO: handle non-monotonic clock behaviour
144 path := ms.logPath(network, entity, t)
145 if f == nil || f.Name() != path {
146 dir := filepath.Dir(path)
147 if err := os.MkdirAll(dir, 0750); err != nil {
148 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
149 }
150
151 ff, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0640)
152 if err != nil {
153 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
154 }
155
156 if f != nil {
157 f.Close()
158 }
159 f = &fsMessageStoreFile{File: ff}
160 ms.files[entity] = f
161 }
162
163 f.lastUse = time.Now()
164
165 if len(ms.files) > fsMessageStoreMaxFiles {
166 entities := make([]string, 0, len(ms.files))
167 for name := range ms.files {
168 entities = append(entities, name)
169 }
170 sort.Slice(entities, func(i, j int) bool {
171 a, b := entities[i], entities[j]
172 return ms.files[a].lastUse.Before(ms.files[b].lastUse)
173 })
174 entities = entities[0 : len(entities)-fsMessageStoreMaxFiles]
175 for _, name := range entities {
176 ms.files[name].Close()
177 delete(ms.files, name)
178 }
179 }
180
181 msgID, err := nextFSMsgID(network, entity, t, f.File)
182 if err != nil {
183 return "", fmt.Errorf("failed to generate message ID: %v", err)
184 }
185
186 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
187 if err != nil {
188 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
189 }
190
191 return msgID, nil
192}
193
194func (ms *fsMessageStore) Close() error {
195 var closeErr error
196 for _, f := range ms.files {
197 if err := f.Close(); err != nil {
198 closeErr = fmt.Errorf("failed to close message store: %v", err)
199 }
200 }
201 return closeErr
202}
203
204// formatMessage formats a message log line. It assumes a well-formed IRC
205// message.
206func formatMessage(msg *irc.Message) string {
207 switch strings.ToUpper(msg.Command) {
208 case "NICK":
209 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
210 case "JOIN":
211 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
212 case "PART":
213 var reason string
214 if len(msg.Params) > 1 {
215 reason = msg.Params[1]
216 }
217 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
218 case "KICK":
219 nick := msg.Params[1]
220 var reason string
221 if len(msg.Params) > 2 {
222 reason = msg.Params[2]
223 }
224 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
225 case "QUIT":
226 var reason string
227 if len(msg.Params) > 0 {
228 reason = msg.Params[0]
229 }
230 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
231 case "TOPIC":
232 var topic string
233 if len(msg.Params) > 1 {
234 topic = msg.Params[1]
235 }
236 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
237 case "MODE":
238 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
239 case "NOTICE":
240 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
241 case "PRIVMSG":
242 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
243 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
244 } else {
245 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
246 }
247 default:
248 return ""
249 }
250}
251
252func parseMessage(line, entity string, ref time.Time, events bool) (*irc.Message, time.Time, error) {
253 var hour, minute, second int
254 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
255 if err != nil {
256 return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: %v", err)
257 }
258 line = line[11:]
259
260 var cmd string
261 var prefix *irc.Prefix
262 var params []string
263 if events && strings.HasPrefix(line, "*** ") {
264 parts := strings.SplitN(line[4:], " ", 2)
265 if len(parts) != 2 {
266 return nil, time.Time{}, nil
267 }
268 switch parts[0] {
269 case "Joins:", "Parts:", "Quits:":
270 args := strings.SplitN(parts[1], " ", 3)
271 if len(args) < 2 {
272 return nil, time.Time{}, nil
273 }
274 nick := args[0]
275 mask := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
276 maskParts := strings.SplitN(mask, "@", 2)
277 if len(maskParts) != 2 {
278 return nil, time.Time{}, nil
279 }
280 prefix = &irc.Prefix{
281 Name: nick,
282 User: maskParts[0],
283 Host: maskParts[1],
284 }
285 var reason string
286 if len(args) > 2 {
287 reason = strings.TrimSuffix(strings.TrimPrefix(args[2], "("), ")")
288 }
289 switch parts[0] {
290 case "Joins:":
291 cmd = "JOIN"
292 params = []string{entity}
293 case "Parts:":
294 cmd = "PART"
295 if reason != "" {
296 params = []string{entity, reason}
297 } else {
298 params = []string{entity}
299 }
300 case "Quits:":
301 cmd = "QUIT"
302 if reason != "" {
303 params = []string{reason}
304 }
305 }
306 default:
307 nick := parts[0]
308 rem := parts[1]
309 if r := strings.TrimPrefix(rem, "is now known as "); r != rem {
310 cmd = "NICK"
311 prefix = &irc.Prefix{
312 Name: nick,
313 }
314 params = []string{r}
315 } else if r := strings.TrimPrefix(rem, "was kicked by "); r != rem {
316 args := strings.SplitN(r, " ", 2)
317 if len(args) != 2 {
318 return nil, time.Time{}, nil
319 }
320 cmd = "KICK"
321 prefix = &irc.Prefix{
322 Name: args[0],
323 }
324 reason := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
325 params = []string{entity, nick}
326 if reason != "" {
327 params = append(params, reason)
328 }
329 } else if r := strings.TrimPrefix(rem, "changes topic to "); r != rem {
330 cmd = "TOPIC"
331 prefix = &irc.Prefix{
332 Name: nick,
333 }
334 topic := strings.TrimSuffix(strings.TrimPrefix(r, "'"), "'")
335 params = []string{entity, topic}
336 } else if r := strings.TrimPrefix(rem, "sets mode: "); r != rem {
337 cmd = "MODE"
338 prefix = &irc.Prefix{
339 Name: nick,
340 }
341 params = append([]string{entity}, strings.Split(r, " ")...)
342 } else {
343 return nil, time.Time{}, nil
344 }
345 }
346 } else {
347 var sender, text string
348 if strings.HasPrefix(line, "<") {
349 cmd = "PRIVMSG"
350 parts := strings.SplitN(line[1:], "> ", 2)
351 if len(parts) != 2 {
352 return nil, time.Time{}, nil
353 }
354 sender, text = parts[0], parts[1]
355 } else if strings.HasPrefix(line, "-") {
356 cmd = "NOTICE"
357 parts := strings.SplitN(line[1:], "- ", 2)
358 if len(parts) != 2 {
359 return nil, time.Time{}, nil
360 }
361 sender, text = parts[0], parts[1]
362 } else if strings.HasPrefix(line, "* ") {
363 cmd = "PRIVMSG"
364 parts := strings.SplitN(line[2:], " ", 2)
365 if len(parts) != 2 {
366 return nil, time.Time{}, nil
367 }
368 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
369 } else {
370 return nil, time.Time{}, nil
371 }
372
373 prefix = &irc.Prefix{Name: sender}
374 params = []string{entity, text}
375 }
376
377 year, month, day := ref.Date()
378 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
379
380 msg := &irc.Message{
381 Tags: map[string]irc.TagValue{
382 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
383 },
384 Prefix: prefix,
385 Command: cmd,
386 Params: params,
387 }
388 return msg, t, nil
389}
390
391func (ms *fsMessageStore) parseMessagesBefore(network *Network, entity string, ref time.Time, end time.Time, events bool, limit int, afterOffset int64) ([]*irc.Message, error) {
392 path := ms.logPath(network, entity, ref)
393 f, err := os.Open(path)
394 if err != nil {
395 if os.IsNotExist(err) {
396 return nil, nil
397 }
398 return nil, fmt.Errorf("failed to parse messages before ref: %v", err)
399 }
400 defer f.Close()
401
402 historyRing := make([]*irc.Message, limit)
403 cur := 0
404
405 sc := bufio.NewScanner(f)
406
407 if afterOffset >= 0 {
408 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
409 return nil, nil
410 }
411 sc.Scan() // skip till next newline
412 }
413
414 for sc.Scan() {
415 msg, t, err := parseMessage(sc.Text(), entity, ref, events)
416 if err != nil {
417 return nil, err
418 } else if msg == nil || !t.After(end) {
419 continue
420 } else if !t.Before(ref) {
421 break
422 }
423
424 historyRing[cur%limit] = msg
425 cur++
426 }
427 if sc.Err() != nil {
428 return nil, fmt.Errorf("failed to parse messages before ref: scanner error: %v", sc.Err())
429 }
430
431 n := limit
432 if cur < limit {
433 n = cur
434 }
435 start := (cur - n + limit) % limit
436
437 if start+n <= limit { // ring doesnt wrap
438 return historyRing[start : start+n], nil
439 } else { // ring wraps
440 history := make([]*irc.Message, n)
441 r := copy(history, historyRing[start:])
442 copy(history[r:], historyRing[:n-r])
443 return history, nil
444 }
445}
446
447func (ms *fsMessageStore) parseMessagesAfter(network *Network, entity string, ref time.Time, end time.Time, events bool, limit int) ([]*irc.Message, error) {
448 path := ms.logPath(network, entity, ref)
449 f, err := os.Open(path)
450 if err != nil {
451 if os.IsNotExist(err) {
452 return nil, nil
453 }
454 return nil, fmt.Errorf("failed to parse messages after ref: %v", err)
455 }
456 defer f.Close()
457
458 var history []*irc.Message
459 sc := bufio.NewScanner(f)
460 for sc.Scan() && len(history) < limit {
461 msg, t, err := parseMessage(sc.Text(), entity, ref, events)
462 if err != nil {
463 return nil, err
464 } else if msg == nil || !t.After(ref) {
465 continue
466 } else if !t.Before(end) {
467 break
468 }
469
470 history = append(history, msg)
471 }
472 if sc.Err() != nil {
473 return nil, fmt.Errorf("failed to parse messages after ref: scanner error: %v", sc.Err())
474 }
475
476 return history, nil
477}
478
479func (ms *fsMessageStore) LoadBeforeTime(network *Network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
480 start = start.In(time.Local)
481 end = end.In(time.Local)
482 history := make([]*irc.Message, limit)
483 remaining := limit
484 tries := 0
485 for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) {
486 buf, err := ms.parseMessagesBefore(network, entity, start, end, events, remaining, -1)
487 if err != nil {
488 return nil, err
489 }
490 if len(buf) == 0 {
491 tries++
492 } else {
493 tries = 0
494 }
495 copy(history[remaining-len(buf):], buf)
496 remaining -= len(buf)
497 year, month, day := start.Date()
498 start = time.Date(year, month, day, 0, 0, 0, 0, start.Location()).Add(-1)
499 }
500
501 return history[remaining:], nil
502}
503
504func (ms *fsMessageStore) LoadAfterTime(network *Network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
505 start = start.In(time.Local)
506 end = end.In(time.Local)
507 var history []*irc.Message
508 remaining := limit
509 tries := 0
510 for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) {
511 buf, err := ms.parseMessagesAfter(network, entity, start, end, events, remaining)
512 if err != nil {
513 return nil, err
514 }
515 if len(buf) == 0 {
516 tries++
517 } else {
518 tries = 0
519 }
520 history = append(history, buf...)
521 remaining -= len(buf)
522 year, month, day := start.Date()
523 start = time.Date(year, month, day+1, 0, 0, 0, 0, start.Location())
524 }
525 return history, nil
526}
527
528func (ms *fsMessageStore) LoadLatestID(network *Network, entity, id string, limit int) ([]*irc.Message, error) {
529 var afterTime time.Time
530 var afterOffset int64
531 if id != "" {
532 var idNet int64
533 var idEntity string
534 var err error
535 idNet, idEntity, afterTime, afterOffset, err = parseFSMsgID(id)
536 if err != nil {
537 return nil, err
538 }
539 if idNet != network.ID || idEntity != entity {
540 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
541 }
542 }
543
544 history := make([]*irc.Message, limit)
545 t := time.Now()
546 remaining := limit
547 tries := 0
548 for remaining > 0 && tries < fsMessageStoreMaxTries && !truncateDay(t).Before(afterTime) {
549 var offset int64 = -1
550 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
551 offset = afterOffset
552 }
553
554 buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, false, remaining, offset)
555 if err != nil {
556 return nil, err
557 }
558 if len(buf) == 0 {
559 tries++
560 } else {
561 tries = 0
562 }
563 copy(history[remaining-len(buf):], buf)
564 remaining -= len(buf)
565 year, month, day := t.Date()
566 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
567 }
568
569 return history[remaining:], nil
570}
571
572func (ms *fsMessageStore) ListTargets(network *Network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error) {
573 start = start.In(time.Local)
574 end = end.In(time.Local)
575 rootPath := filepath.Join(ms.root, escapeFilename(network.GetName()))
576 root, err := os.Open(rootPath)
577 if os.IsNotExist(err) {
578 return nil, nil
579 } else if err != nil {
580 return nil, err
581 }
582
583 // The returned targets are escaped, and there is no way to un-escape
584 // TODO: switch to ReadDir (Go 1.16+)
585 targetNames, err := root.Readdirnames(0)
586 root.Close()
587 if err != nil {
588 return nil, err
589 }
590
591 var targets []chatHistoryTarget
592 for _, target := range targetNames {
593 // target is already escaped here
594 targetPath := filepath.Join(rootPath, target)
595 targetDir, err := os.Open(targetPath)
596 if err != nil {
597 return nil, err
598 }
599
600 entries, err := targetDir.Readdir(0)
601 targetDir.Close()
602 if err != nil {
603 return nil, err
604 }
605
606 // We use mtime here, which may give imprecise or incorrect results
607 var t time.Time
608 for _, entry := range entries {
609 if entry.ModTime().After(t) {
610 t = entry.ModTime()
611 }
612 }
613
614 // The timestamps we get from logs have second granularity
615 t = truncateSecond(t)
616
617 // Filter out targets that don't fullfil the time bounds
618 if !isTimeBetween(t, start, end) {
619 continue
620 }
621
622 targets = append(targets, chatHistoryTarget{
623 Name: target,
624 LatestMessage: t,
625 })
626 }
627
628 // Sort targets by latest message time, backwards or forwards depending on
629 // the order of the time bounds
630 sort.Slice(targets, func(i, j int) bool {
631 t1, t2 := targets[i].LatestMessage, targets[j].LatestMessage
632 if start.Before(end) {
633 return t1.Before(t2)
634 } else {
635 return !t1.Before(t2)
636 }
637 })
638
639 // Truncate the result if necessary
640 if len(targets) > limit {
641 targets = targets[:limit]
642 }
643
644 return targets, nil
645}
646
647func (ms *fsMessageStore) RenameNetwork(oldNet, newNet *Network) error {
648 oldDir := filepath.Join(ms.root, escapeFilename(oldNet.GetName()))
649 newDir := filepath.Join(ms.root, escapeFilename(newNet.GetName()))
650 // Avoid loosing data by overwriting an existing directory
651 if _, err := os.Stat(newDir); err == nil {
652 return fmt.Errorf("destination %q already exists", newDir)
653 }
654 return os.Rename(oldDir, newDir)
655}
656
657func truncateDay(t time.Time) time.Time {
658 year, month, day := t.Date()
659 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
660}
661
662func truncateSecond(t time.Time) time.Time {
663 year, month, day := t.Date()
664 return time.Date(year, month, day, t.Hour(), t.Minute(), t.Second(), 0, t.Location())
665}
666
667func isTimeBetween(t, start, end time.Time) bool {
668 if end.Before(start) {
669 end, start = start, end
670 }
671 return start.Before(t) && t.Before(end)
672}
Note: See TracBrowser for help on using the repository browser.