source: code/trunk/msgstore_fs.go@ 761

Last change on this file since 761 was 668, checked in by contact, 4 years ago

msgstore_fs: abort on timeout

File size: 17.8 KB
RevLine 
[439]1package soju
2
3import (
4 "bufio"
[667]5 "context"
[439]6 "fmt"
7 "io"
8 "os"
9 "path/filepath"
[549]10 "sort"
[439]11 "strings"
12 "time"
13
[488]14 "git.sr.ht/~sircmpwn/go-bare"
[439]15 "gopkg.in/irc.v3"
16)
17
[608]18const (
19 fsMessageStoreMaxFiles = 20
20 fsMessageStoreMaxTries = 100
21)
[439]22
[591]23func escapeFilename(unsafe string) (safe string) {
24 if unsafe == "." {
25 return "-"
26 } else if unsafe == ".." {
27 return "--"
28 } else {
29 return strings.NewReplacer("/", "-", "\\", "-").Replace(unsafe)
30 }
31}
[439]32
[488]33type date struct {
34 Year, Month, Day int
35}
36
37func newDate(t time.Time) date {
38 year, month, day := t.Date()
39 return date{year, int(month), day}
40}
41
42func (d date) Time() time.Time {
43 return time.Date(d.Year, time.Month(d.Month), d.Day, 0, 0, 0, 0, time.Local)
44}
45
46type fsMsgID struct {
47 Date date
48 Offset bare.Int
49}
50
51func (fsMsgID) msgIDType() msgIDType {
52 return msgIDFS
53}
54
55func parseFSMsgID(s string) (netID int64, entity string, t time.Time, offset int64, err error) {
56 var id fsMsgID
57 netID, entity, err = parseMsgID(s, &id)
58 if err != nil {
59 return 0, "", time.Time{}, 0, err
60 }
61 return netID, entity, id.Date.Time(), int64(id.Offset), nil
62}
63
64func formatFSMsgID(netID int64, entity string, t time.Time, offset int64) string {
65 id := fsMsgID{
66 Date: newDate(t),
67 Offset: bare.Int(offset),
68 }
69 return formatMsgID(netID, entity, &id)
70}
71
[608]72type fsMessageStoreFile struct {
73 *os.File
74 lastUse time.Time
75}
76
[439]77// fsMessageStore is a per-user on-disk store for IRC messages.
[642]78//
79// It mimicks the ZNC log layout and format. See the ZNC source:
80// https://github.com/znc/znc/blob/master/modules/log.cpp
[439]81type fsMessageStore struct {
82 root string
83
[608]84 // Write-only files used by Append
85 files map[string]*fsMessageStoreFile // indexed by entity
[439]86}
87
[517]88var _ messageStore = (*fsMessageStore)(nil)
89var _ chatHistoryMessageStore = (*fsMessageStore)(nil)
90
[439]91func newFSMessageStore(root, username string) *fsMessageStore {
92 return &fsMessageStore{
[591]93 root: filepath.Join(root, escapeFilename(username)),
[608]94 files: make(map[string]*fsMessageStoreFile),
[439]95 }
96}
97
[666]98func (ms *fsMessageStore) logPath(network *Network, entity string, t time.Time) string {
[439]99 year, month, day := t.Date()
100 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
[591]101 return filepath.Join(ms.root, escapeFilename(network.GetName()), escapeFilename(entity), filename)
[439]102}
103
104// nextMsgID queries the message ID for the next message to be written to f.
[666]105func nextFSMsgID(network *Network, entity string, t time.Time, f *os.File) (string, error) {
[439]106 offset, err := f.Seek(0, io.SeekEnd)
107 if err != nil {
[515]108 return "", fmt.Errorf("failed to query next FS message ID: %v", err)
[439]109 }
[440]110 return formatFSMsgID(network.ID, entity, t, offset), nil
[439]111}
112
[666]113func (ms *fsMessageStore) LastMsgID(network *Network, entity string, t time.Time) (string, error) {
[439]114 p := ms.logPath(network, entity, t)
115 fi, err := os.Stat(p)
116 if os.IsNotExist(err) {
[440]117 return formatFSMsgID(network.ID, entity, t, -1), nil
[439]118 } else if err != nil {
[515]119 return "", fmt.Errorf("failed to query last FS message ID: %v", err)
[439]120 }
[440]121 return formatFSMsgID(network.ID, entity, t, fi.Size()-1), nil
[439]122}
123
[666]124func (ms *fsMessageStore) Append(network *Network, entity string, msg *irc.Message) (string, error) {
[439]125 s := formatMessage(msg)
126 if s == "" {
127 return "", nil
128 }
129
130 var t time.Time
131 if tag, ok := msg.Tags["time"]; ok {
132 var err error
133 t, err = time.Parse(serverTimeLayout, string(tag))
134 if err != nil {
135 return "", fmt.Errorf("failed to parse message time tag: %v", err)
136 }
137 t = t.In(time.Local)
138 } else {
139 t = time.Now()
140 }
141
142 f := ms.files[entity]
143
144 // TODO: handle non-monotonic clock behaviour
145 path := ms.logPath(network, entity, t)
146 if f == nil || f.Name() != path {
147 dir := filepath.Dir(path)
[558]148 if err := os.MkdirAll(dir, 0750); err != nil {
[439]149 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
150 }
151
[608]152 ff, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0640)
[439]153 if err != nil {
154 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
155 }
156
[608]157 if f != nil {
158 f.Close()
159 }
160 f = &fsMessageStoreFile{File: ff}
[439]161 ms.files[entity] = f
162 }
163
[608]164 f.lastUse = time.Now()
165
166 if len(ms.files) > fsMessageStoreMaxFiles {
167 entities := make([]string, 0, len(ms.files))
168 for name := range ms.files {
169 entities = append(entities, name)
170 }
171 sort.Slice(entities, func(i, j int) bool {
172 a, b := entities[i], entities[j]
173 return ms.files[a].lastUse.Before(ms.files[b].lastUse)
174 })
175 entities = entities[0 : len(entities)-fsMessageStoreMaxFiles]
176 for _, name := range entities {
177 ms.files[name].Close()
178 delete(ms.files, name)
179 }
180 }
181
182 msgID, err := nextFSMsgID(network, entity, t, f.File)
[439]183 if err != nil {
184 return "", fmt.Errorf("failed to generate message ID: %v", err)
185 }
186
187 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
188 if err != nil {
189 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
190 }
191
192 return msgID, nil
193}
194
195func (ms *fsMessageStore) Close() error {
196 var closeErr error
197 for _, f := range ms.files {
198 if err := f.Close(); err != nil {
199 closeErr = fmt.Errorf("failed to close message store: %v", err)
200 }
201 }
202 return closeErr
203}
204
205// formatMessage formats a message log line. It assumes a well-formed IRC
206// message.
207func formatMessage(msg *irc.Message) string {
208 switch strings.ToUpper(msg.Command) {
209 case "NICK":
210 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
211 case "JOIN":
212 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
213 case "PART":
214 var reason string
215 if len(msg.Params) > 1 {
216 reason = msg.Params[1]
217 }
218 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
219 case "KICK":
220 nick := msg.Params[1]
221 var reason string
222 if len(msg.Params) > 2 {
223 reason = msg.Params[2]
224 }
225 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
226 case "QUIT":
227 var reason string
228 if len(msg.Params) > 0 {
229 reason = msg.Params[0]
230 }
231 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
232 case "TOPIC":
233 var topic string
234 if len(msg.Params) > 1 {
235 topic = msg.Params[1]
236 }
237 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
238 case "MODE":
239 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
240 case "NOTICE":
241 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
242 case "PRIVMSG":
243 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
244 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
245 } else {
246 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
247 }
248 default:
249 return ""
250 }
251}
252
[665]253func parseMessage(line, entity string, ref time.Time, events bool) (*irc.Message, time.Time, error) {
[439]254 var hour, minute, second int
255 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
256 if err != nil {
[515]257 return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: %v", err)
[439]258 }
259 line = line[11:]
260
[665]261 var cmd string
262 var prefix *irc.Prefix
263 var params []string
264 if events && strings.HasPrefix(line, "*** ") {
265 parts := strings.SplitN(line[4:], " ", 2)
[439]266 if len(parts) != 2 {
267 return nil, time.Time{}, nil
268 }
[665]269 switch parts[0] {
270 case "Joins:", "Parts:", "Quits:":
271 args := strings.SplitN(parts[1], " ", 3)
272 if len(args) < 2 {
273 return nil, time.Time{}, nil
274 }
275 nick := args[0]
276 mask := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
277 maskParts := strings.SplitN(mask, "@", 2)
278 if len(maskParts) != 2 {
279 return nil, time.Time{}, nil
280 }
281 prefix = &irc.Prefix{
282 Name: nick,
283 User: maskParts[0],
284 Host: maskParts[1],
285 }
286 var reason string
287 if len(args) > 2 {
288 reason = strings.TrimSuffix(strings.TrimPrefix(args[2], "("), ")")
289 }
290 switch parts[0] {
291 case "Joins:":
292 cmd = "JOIN"
293 params = []string{entity}
294 case "Parts:":
295 cmd = "PART"
296 if reason != "" {
297 params = []string{entity, reason}
298 } else {
299 params = []string{entity}
300 }
301 case "Quits:":
302 cmd = "QUIT"
303 if reason != "" {
304 params = []string{reason}
305 }
306 }
307 default:
308 nick := parts[0]
309 rem := parts[1]
310 if r := strings.TrimPrefix(rem, "is now known as "); r != rem {
311 cmd = "NICK"
312 prefix = &irc.Prefix{
313 Name: nick,
314 }
315 params = []string{r}
316 } else if r := strings.TrimPrefix(rem, "was kicked by "); r != rem {
317 args := strings.SplitN(r, " ", 2)
318 if len(args) != 2 {
319 return nil, time.Time{}, nil
320 }
321 cmd = "KICK"
322 prefix = &irc.Prefix{
323 Name: args[0],
324 }
325 reason := strings.TrimSuffix(strings.TrimPrefix(args[1], "("), ")")
326 params = []string{entity, nick}
327 if reason != "" {
328 params = append(params, reason)
329 }
330 } else if r := strings.TrimPrefix(rem, "changes topic to "); r != rem {
331 cmd = "TOPIC"
332 prefix = &irc.Prefix{
333 Name: nick,
334 }
335 topic := strings.TrimSuffix(strings.TrimPrefix(r, "'"), "'")
336 params = []string{entity, topic}
337 } else if r := strings.TrimPrefix(rem, "sets mode: "); r != rem {
338 cmd = "MODE"
339 prefix = &irc.Prefix{
340 Name: nick,
341 }
342 params = append([]string{entity}, strings.Split(r, " ")...)
343 } else {
344 return nil, time.Time{}, nil
345 }
[439]346 }
[665]347 } else {
348 var sender, text string
349 if strings.HasPrefix(line, "<") {
350 cmd = "PRIVMSG"
351 parts := strings.SplitN(line[1:], "> ", 2)
352 if len(parts) != 2 {
353 return nil, time.Time{}, nil
354 }
355 sender, text = parts[0], parts[1]
356 } else if strings.HasPrefix(line, "-") {
357 cmd = "NOTICE"
358 parts := strings.SplitN(line[1:], "- ", 2)
359 if len(parts) != 2 {
360 return nil, time.Time{}, nil
361 }
362 sender, text = parts[0], parts[1]
363 } else if strings.HasPrefix(line, "* ") {
364 cmd = "PRIVMSG"
365 parts := strings.SplitN(line[2:], " ", 2)
366 if len(parts) != 2 {
367 return nil, time.Time{}, nil
368 }
369 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
370 } else {
[439]371 return nil, time.Time{}, nil
372 }
[665]373
374 prefix = &irc.Prefix{Name: sender}
375 params = []string{entity, text}
[439]376 }
377
378 year, month, day := ref.Date()
379 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
380
381 msg := &irc.Message{
382 Tags: map[string]irc.TagValue{
383 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
384 },
[665]385 Prefix: prefix,
[439]386 Command: cmd,
[665]387 Params: params,
[439]388 }
389 return msg, t, nil
390}
391
[666]392func (ms *fsMessageStore) parseMessagesBefore(network *Network, entity string, ref time.Time, end time.Time, events bool, limit int, afterOffset int64) ([]*irc.Message, error) {
[439]393 path := ms.logPath(network, entity, ref)
394 f, err := os.Open(path)
395 if err != nil {
396 if os.IsNotExist(err) {
397 return nil, nil
398 }
[515]399 return nil, fmt.Errorf("failed to parse messages before ref: %v", err)
[439]400 }
401 defer f.Close()
402
403 historyRing := make([]*irc.Message, limit)
404 cur := 0
405
406 sc := bufio.NewScanner(f)
407
408 if afterOffset >= 0 {
409 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
410 return nil, nil
411 }
412 sc.Scan() // skip till next newline
413 }
414
415 for sc.Scan() {
[665]416 msg, t, err := parseMessage(sc.Text(), entity, ref, events)
[439]417 if err != nil {
418 return nil, err
[516]419 } else if msg == nil || !t.After(end) {
[439]420 continue
421 } else if !t.Before(ref) {
422 break
423 }
424
425 historyRing[cur%limit] = msg
426 cur++
427 }
428 if sc.Err() != nil {
[515]429 return nil, fmt.Errorf("failed to parse messages before ref: scanner error: %v", sc.Err())
[439]430 }
431
432 n := limit
433 if cur < limit {
434 n = cur
435 }
436 start := (cur - n + limit) % limit
437
438 if start+n <= limit { // ring doesnt wrap
439 return historyRing[start : start+n], nil
440 } else { // ring wraps
441 history := make([]*irc.Message, n)
442 r := copy(history, historyRing[start:])
443 copy(history[r:], historyRing[:n-r])
444 return history, nil
445 }
446}
447
[666]448func (ms *fsMessageStore) parseMessagesAfter(network *Network, entity string, ref time.Time, end time.Time, events bool, limit int) ([]*irc.Message, error) {
[439]449 path := ms.logPath(network, entity, ref)
450 f, err := os.Open(path)
451 if err != nil {
452 if os.IsNotExist(err) {
453 return nil, nil
454 }
[515]455 return nil, fmt.Errorf("failed to parse messages after ref: %v", err)
[439]456 }
457 defer f.Close()
458
459 var history []*irc.Message
460 sc := bufio.NewScanner(f)
461 for sc.Scan() && len(history) < limit {
[665]462 msg, t, err := parseMessage(sc.Text(), entity, ref, events)
[439]463 if err != nil {
464 return nil, err
465 } else if msg == nil || !t.After(ref) {
466 continue
[516]467 } else if !t.Before(end) {
468 break
[439]469 }
470
471 history = append(history, msg)
472 }
473 if sc.Err() != nil {
[515]474 return nil, fmt.Errorf("failed to parse messages after ref: scanner error: %v", sc.Err())
[439]475 }
476
477 return history, nil
478}
479
[667]480func (ms *fsMessageStore) LoadBeforeTime(ctx context.Context, network *Network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
[610]481 start = start.In(time.Local)
482 end = end.In(time.Local)
[439]483 history := make([]*irc.Message, limit)
484 remaining := limit
485 tries := 0
[516]486 for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) {
[665]487 buf, err := ms.parseMessagesBefore(network, entity, start, end, events, remaining, -1)
[439]488 if err != nil {
489 return nil, err
490 }
491 if len(buf) == 0 {
492 tries++
493 } else {
494 tries = 0
495 }
496 copy(history[remaining-len(buf):], buf)
497 remaining -= len(buf)
[516]498 year, month, day := start.Date()
499 start = time.Date(year, month, day, 0, 0, 0, 0, start.Location()).Add(-1)
[668]500
501 if err := ctx.Err(); err != nil {
502 return nil, err
503 }
[439]504 }
505
506 return history[remaining:], nil
507}
508
[667]509func (ms *fsMessageStore) LoadAfterTime(ctx context.Context, network *Network, entity string, start time.Time, end time.Time, limit int, events bool) ([]*irc.Message, error) {
[610]510 start = start.In(time.Local)
511 end = end.In(time.Local)
[439]512 var history []*irc.Message
513 remaining := limit
514 tries := 0
[516]515 for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) {
[665]516 buf, err := ms.parseMessagesAfter(network, entity, start, end, events, remaining)
[439]517 if err != nil {
518 return nil, err
519 }
520 if len(buf) == 0 {
521 tries++
522 } else {
523 tries = 0
524 }
525 history = append(history, buf...)
526 remaining -= len(buf)
[516]527 year, month, day := start.Date()
528 start = time.Date(year, month, day+1, 0, 0, 0, 0, start.Location())
[668]529
530 if err := ctx.Err(); err != nil {
531 return nil, err
532 }
[439]533 }
534 return history, nil
535}
536
[667]537func (ms *fsMessageStore) LoadLatestID(ctx context.Context, network *Network, entity, id string, limit int) ([]*irc.Message, error) {
[439]538 var afterTime time.Time
539 var afterOffset int64
540 if id != "" {
[440]541 var idNet int64
542 var idEntity string
[439]543 var err error
[440]544 idNet, idEntity, afterTime, afterOffset, err = parseFSMsgID(id)
[439]545 if err != nil {
546 return nil, err
547 }
[440]548 if idNet != network.ID || idEntity != entity {
[439]549 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
550 }
551 }
552
553 history := make([]*irc.Message, limit)
554 t := time.Now()
555 remaining := limit
556 tries := 0
557 for remaining > 0 && tries < fsMessageStoreMaxTries && !truncateDay(t).Before(afterTime) {
558 var offset int64 = -1
559 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
560 offset = afterOffset
561 }
562
[665]563 buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, false, remaining, offset)
[439]564 if err != nil {
565 return nil, err
566 }
567 if len(buf) == 0 {
568 tries++
569 } else {
570 tries = 0
571 }
572 copy(history[remaining-len(buf):], buf)
573 remaining -= len(buf)
574 year, month, day := t.Date()
575 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
[668]576
577 if err := ctx.Err(); err != nil {
578 return nil, err
579 }
[439]580 }
581
582 return history[remaining:], nil
583}
[549]584
[667]585func (ms *fsMessageStore) ListTargets(ctx context.Context, network *Network, start, end time.Time, limit int, events bool) ([]chatHistoryTarget, error) {
[610]586 start = start.In(time.Local)
587 end = end.In(time.Local)
[591]588 rootPath := filepath.Join(ms.root, escapeFilename(network.GetName()))
[549]589 root, err := os.Open(rootPath)
[628]590 if os.IsNotExist(err) {
591 return nil, nil
592 } else if err != nil {
[549]593 return nil, err
594 }
595
596 // The returned targets are escaped, and there is no way to un-escape
597 // TODO: switch to ReadDir (Go 1.16+)
598 targetNames, err := root.Readdirnames(0)
599 root.Close()
600 if err != nil {
601 return nil, err
602 }
603
604 var targets []chatHistoryTarget
605 for _, target := range targetNames {
606 // target is already escaped here
607 targetPath := filepath.Join(rootPath, target)
608 targetDir, err := os.Open(targetPath)
609 if err != nil {
610 return nil, err
611 }
612
613 entries, err := targetDir.Readdir(0)
614 targetDir.Close()
615 if err != nil {
616 return nil, err
617 }
618
619 // We use mtime here, which may give imprecise or incorrect results
620 var t time.Time
621 for _, entry := range entries {
622 if entry.ModTime().After(t) {
623 t = entry.ModTime()
624 }
625 }
626
627 // The timestamps we get from logs have second granularity
628 t = truncateSecond(t)
629
630 // Filter out targets that don't fullfil the time bounds
631 if !isTimeBetween(t, start, end) {
632 continue
633 }
634
635 targets = append(targets, chatHistoryTarget{
636 Name: target,
637 LatestMessage: t,
638 })
[668]639
640 if err := ctx.Err(); err != nil {
641 return nil, err
642 }
[549]643 }
644
645 // Sort targets by latest message time, backwards or forwards depending on
646 // the order of the time bounds
647 sort.Slice(targets, func(i, j int) bool {
648 t1, t2 := targets[i].LatestMessage, targets[j].LatestMessage
649 if start.Before(end) {
650 return t1.Before(t2)
651 } else {
652 return !t1.Before(t2)
653 }
654 })
655
656 // Truncate the result if necessary
657 if len(targets) > limit {
658 targets = targets[:limit]
659 }
660
661 return targets, nil
662}
663
[666]664func (ms *fsMessageStore) RenameNetwork(oldNet, newNet *Network) error {
[644]665 oldDir := filepath.Join(ms.root, escapeFilename(oldNet.GetName()))
666 newDir := filepath.Join(ms.root, escapeFilename(newNet.GetName()))
667 // Avoid loosing data by overwriting an existing directory
668 if _, err := os.Stat(newDir); err == nil {
669 return fmt.Errorf("destination %q already exists", newDir)
670 }
671 return os.Rename(oldDir, newDir)
672}
673
[549]674func truncateDay(t time.Time) time.Time {
675 year, month, day := t.Date()
676 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
677}
678
679func truncateSecond(t time.Time) time.Time {
680 year, month, day := t.Date()
681 return time.Date(year, month, day, t.Hour(), t.Minute(), t.Second(), 0, t.Location())
682}
683
684func isTimeBetween(t, start, end time.Time) bool {
685 if end.Before(start) {
686 end, start = start, end
687 }
688 return start.Before(t) && t.Before(end)
689}
Note: See TracBrowser for help on using the repository browser.