source: code/trunk/msgstore_fs.go@ 620

Last change on this file since 620 was 610, checked in by alex, 4 years ago

chathistory: Fix truncated backlog due to timezones

Because msgstore_fs writes logs in localtime, the CHATHISTORY timestamps
(UTC) must be converted to localtime prior to filtering ranges ensure
the right range is sent back to the client.

Prior to this patch, the iteration back from the BEFORE time failed to
load the hours between midnight UTC and midnight localtime in each day's
logged messages. This is because the final time to be considered in a
day's log file (the "start" time) reuses the previous start time's
locale:

start = time.Date(year, month, day, 0, 0, 0, 0, start.Location()).Add(-1)

By converting the original start and end from the CHATHISTORY commands
to localtime in Load*Time and ListTargets, we ensure we read through
midnight each day.

File size: 14.6 KB
Line 
1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "sort"
10 "strings"
11 "time"
12
13 "git.sr.ht/~sircmpwn/go-bare"
14 "gopkg.in/irc.v3"
15)
16
17const (
18 fsMessageStoreMaxFiles = 20
19 fsMessageStoreMaxTries = 100
20)
21
22func escapeFilename(unsafe string) (safe string) {
23 if unsafe == "." {
24 return "-"
25 } else if unsafe == ".." {
26 return "--"
27 } else {
28 return strings.NewReplacer("/", "-", "\\", "-").Replace(unsafe)
29 }
30}
31
32type date struct {
33 Year, Month, Day int
34}
35
36func newDate(t time.Time) date {
37 year, month, day := t.Date()
38 return date{year, int(month), day}
39}
40
41func (d date) Time() time.Time {
42 return time.Date(d.Year, time.Month(d.Month), d.Day, 0, 0, 0, 0, time.Local)
43}
44
45type fsMsgID struct {
46 Date date
47 Offset bare.Int
48}
49
50func (fsMsgID) msgIDType() msgIDType {
51 return msgIDFS
52}
53
54func parseFSMsgID(s string) (netID int64, entity string, t time.Time, offset int64, err error) {
55 var id fsMsgID
56 netID, entity, err = parseMsgID(s, &id)
57 if err != nil {
58 return 0, "", time.Time{}, 0, err
59 }
60 return netID, entity, id.Date.Time(), int64(id.Offset), nil
61}
62
63func formatFSMsgID(netID int64, entity string, t time.Time, offset int64) string {
64 id := fsMsgID{
65 Date: newDate(t),
66 Offset: bare.Int(offset),
67 }
68 return formatMsgID(netID, entity, &id)
69}
70
71type fsMessageStoreFile struct {
72 *os.File
73 lastUse time.Time
74}
75
76// fsMessageStore is a per-user on-disk store for IRC messages.
77type fsMessageStore struct {
78 root string
79
80 // Write-only files used by Append
81 files map[string]*fsMessageStoreFile // indexed by entity
82}
83
84var _ messageStore = (*fsMessageStore)(nil)
85var _ chatHistoryMessageStore = (*fsMessageStore)(nil)
86
87func newFSMessageStore(root, username string) *fsMessageStore {
88 return &fsMessageStore{
89 root: filepath.Join(root, escapeFilename(username)),
90 files: make(map[string]*fsMessageStoreFile),
91 }
92}
93
94func (ms *fsMessageStore) logPath(network *network, entity string, t time.Time) string {
95 year, month, day := t.Date()
96 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
97 return filepath.Join(ms.root, escapeFilename(network.GetName()), escapeFilename(entity), filename)
98}
99
100// nextMsgID queries the message ID for the next message to be written to f.
101func nextFSMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
102 offset, err := f.Seek(0, io.SeekEnd)
103 if err != nil {
104 return "", fmt.Errorf("failed to query next FS message ID: %v", err)
105 }
106 return formatFSMsgID(network.ID, entity, t, offset), nil
107}
108
109func (ms *fsMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
110 p := ms.logPath(network, entity, t)
111 fi, err := os.Stat(p)
112 if os.IsNotExist(err) {
113 return formatFSMsgID(network.ID, entity, t, -1), nil
114 } else if err != nil {
115 return "", fmt.Errorf("failed to query last FS message ID: %v", err)
116 }
117 return formatFSMsgID(network.ID, entity, t, fi.Size()-1), nil
118}
119
120func (ms *fsMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
121 s := formatMessage(msg)
122 if s == "" {
123 return "", nil
124 }
125
126 var t time.Time
127 if tag, ok := msg.Tags["time"]; ok {
128 var err error
129 t, err = time.Parse(serverTimeLayout, string(tag))
130 if err != nil {
131 return "", fmt.Errorf("failed to parse message time tag: %v", err)
132 }
133 t = t.In(time.Local)
134 } else {
135 t = time.Now()
136 }
137
138 f := ms.files[entity]
139
140 // TODO: handle non-monotonic clock behaviour
141 path := ms.logPath(network, entity, t)
142 if f == nil || f.Name() != path {
143 dir := filepath.Dir(path)
144 if err := os.MkdirAll(dir, 0750); err != nil {
145 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
146 }
147
148 ff, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0640)
149 if err != nil {
150 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
151 }
152
153 if f != nil {
154 f.Close()
155 }
156 f = &fsMessageStoreFile{File: ff}
157 ms.files[entity] = f
158 }
159
160 f.lastUse = time.Now()
161
162 if len(ms.files) > fsMessageStoreMaxFiles {
163 entities := make([]string, 0, len(ms.files))
164 for name := range ms.files {
165 entities = append(entities, name)
166 }
167 sort.Slice(entities, func(i, j int) bool {
168 a, b := entities[i], entities[j]
169 return ms.files[a].lastUse.Before(ms.files[b].lastUse)
170 })
171 entities = entities[0 : len(entities)-fsMessageStoreMaxFiles]
172 for _, name := range entities {
173 ms.files[name].Close()
174 delete(ms.files, name)
175 }
176 }
177
178 msgID, err := nextFSMsgID(network, entity, t, f.File)
179 if err != nil {
180 return "", fmt.Errorf("failed to generate message ID: %v", err)
181 }
182
183 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
184 if err != nil {
185 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
186 }
187
188 return msgID, nil
189}
190
191func (ms *fsMessageStore) Close() error {
192 var closeErr error
193 for _, f := range ms.files {
194 if err := f.Close(); err != nil {
195 closeErr = fmt.Errorf("failed to close message store: %v", err)
196 }
197 }
198 return closeErr
199}
200
201// formatMessage formats a message log line. It assumes a well-formed IRC
202// message.
203func formatMessage(msg *irc.Message) string {
204 switch strings.ToUpper(msg.Command) {
205 case "NICK":
206 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
207 case "JOIN":
208 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
209 case "PART":
210 var reason string
211 if len(msg.Params) > 1 {
212 reason = msg.Params[1]
213 }
214 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
215 case "KICK":
216 nick := msg.Params[1]
217 var reason string
218 if len(msg.Params) > 2 {
219 reason = msg.Params[2]
220 }
221 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
222 case "QUIT":
223 var reason string
224 if len(msg.Params) > 0 {
225 reason = msg.Params[0]
226 }
227 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
228 case "TOPIC":
229 var topic string
230 if len(msg.Params) > 1 {
231 topic = msg.Params[1]
232 }
233 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
234 case "MODE":
235 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
236 case "NOTICE":
237 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
238 case "PRIVMSG":
239 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
240 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
241 } else {
242 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
243 }
244 default:
245 return ""
246 }
247}
248
249func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
250 var hour, minute, second int
251 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
252 if err != nil {
253 return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: %v", err)
254 }
255 line = line[11:]
256
257 var cmd, sender, text string
258 if strings.HasPrefix(line, "<") {
259 cmd = "PRIVMSG"
260 parts := strings.SplitN(line[1:], "> ", 2)
261 if len(parts) != 2 {
262 return nil, time.Time{}, nil
263 }
264 sender, text = parts[0], parts[1]
265 } else if strings.HasPrefix(line, "-") {
266 cmd = "NOTICE"
267 parts := strings.SplitN(line[1:], "- ", 2)
268 if len(parts) != 2 {
269 return nil, time.Time{}, nil
270 }
271 sender, text = parts[0], parts[1]
272 } else if strings.HasPrefix(line, "* ") {
273 cmd = "PRIVMSG"
274 parts := strings.SplitN(line[2:], " ", 2)
275 if len(parts) != 2 {
276 return nil, time.Time{}, nil
277 }
278 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
279 } else {
280 return nil, time.Time{}, nil
281 }
282
283 year, month, day := ref.Date()
284 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
285
286 msg := &irc.Message{
287 Tags: map[string]irc.TagValue{
288 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
289 },
290 Prefix: &irc.Prefix{Name: sender},
291 Command: cmd,
292 Params: []string{entity, text},
293 }
294 return msg, t, nil
295}
296
297func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
298 path := ms.logPath(network, entity, ref)
299 f, err := os.Open(path)
300 if err != nil {
301 if os.IsNotExist(err) {
302 return nil, nil
303 }
304 return nil, fmt.Errorf("failed to parse messages before ref: %v", err)
305 }
306 defer f.Close()
307
308 historyRing := make([]*irc.Message, limit)
309 cur := 0
310
311 sc := bufio.NewScanner(f)
312
313 if afterOffset >= 0 {
314 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
315 return nil, nil
316 }
317 sc.Scan() // skip till next newline
318 }
319
320 for sc.Scan() {
321 msg, t, err := parseMessage(sc.Text(), entity, ref)
322 if err != nil {
323 return nil, err
324 } else if msg == nil || !t.After(end) {
325 continue
326 } else if !t.Before(ref) {
327 break
328 }
329
330 historyRing[cur%limit] = msg
331 cur++
332 }
333 if sc.Err() != nil {
334 return nil, fmt.Errorf("failed to parse messages before ref: scanner error: %v", sc.Err())
335 }
336
337 n := limit
338 if cur < limit {
339 n = cur
340 }
341 start := (cur - n + limit) % limit
342
343 if start+n <= limit { // ring doesnt wrap
344 return historyRing[start : start+n], nil
345 } else { // ring wraps
346 history := make([]*irc.Message, n)
347 r := copy(history, historyRing[start:])
348 copy(history[r:], historyRing[:n-r])
349 return history, nil
350 }
351}
352
353func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, limit int) ([]*irc.Message, error) {
354 path := ms.logPath(network, entity, ref)
355 f, err := os.Open(path)
356 if err != nil {
357 if os.IsNotExist(err) {
358 return nil, nil
359 }
360 return nil, fmt.Errorf("failed to parse messages after ref: %v", err)
361 }
362 defer f.Close()
363
364 var history []*irc.Message
365 sc := bufio.NewScanner(f)
366 for sc.Scan() && len(history) < limit {
367 msg, t, err := parseMessage(sc.Text(), entity, ref)
368 if err != nil {
369 return nil, err
370 } else if msg == nil || !t.After(ref) {
371 continue
372 } else if !t.Before(end) {
373 break
374 }
375
376 history = append(history, msg)
377 }
378 if sc.Err() != nil {
379 return nil, fmt.Errorf("failed to parse messages after ref: scanner error: %v", sc.Err())
380 }
381
382 return history, nil
383}
384
385func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) {
386 start = start.In(time.Local)
387 end = end.In(time.Local)
388 history := make([]*irc.Message, limit)
389 remaining := limit
390 tries := 0
391 for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) {
392 buf, err := ms.parseMessagesBefore(network, entity, start, end, remaining, -1)
393 if err != nil {
394 return nil, err
395 }
396 if len(buf) == 0 {
397 tries++
398 } else {
399 tries = 0
400 }
401 copy(history[remaining-len(buf):], buf)
402 remaining -= len(buf)
403 year, month, day := start.Date()
404 start = time.Date(year, month, day, 0, 0, 0, 0, start.Location()).Add(-1)
405 }
406
407 return history[remaining:], nil
408}
409
410func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) {
411 start = start.In(time.Local)
412 end = end.In(time.Local)
413 var history []*irc.Message
414 remaining := limit
415 tries := 0
416 for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) {
417 buf, err := ms.parseMessagesAfter(network, entity, start, end, remaining)
418 if err != nil {
419 return nil, err
420 }
421 if len(buf) == 0 {
422 tries++
423 } else {
424 tries = 0
425 }
426 history = append(history, buf...)
427 remaining -= len(buf)
428 year, month, day := start.Date()
429 start = time.Date(year, month, day+1, 0, 0, 0, 0, start.Location())
430 }
431 return history, nil
432}
433
434func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
435 var afterTime time.Time
436 var afterOffset int64
437 if id != "" {
438 var idNet int64
439 var idEntity string
440 var err error
441 idNet, idEntity, afterTime, afterOffset, err = parseFSMsgID(id)
442 if err != nil {
443 return nil, err
444 }
445 if idNet != network.ID || idEntity != entity {
446 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
447 }
448 }
449
450 history := make([]*irc.Message, limit)
451 t := time.Now()
452 remaining := limit
453 tries := 0
454 for remaining > 0 && tries < fsMessageStoreMaxTries && !truncateDay(t).Before(afterTime) {
455 var offset int64 = -1
456 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
457 offset = afterOffset
458 }
459
460 buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, remaining, offset)
461 if err != nil {
462 return nil, err
463 }
464 if len(buf) == 0 {
465 tries++
466 } else {
467 tries = 0
468 }
469 copy(history[remaining-len(buf):], buf)
470 remaining -= len(buf)
471 year, month, day := t.Date()
472 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
473 }
474
475 return history[remaining:], nil
476}
477
478func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int) ([]chatHistoryTarget, error) {
479 start = start.In(time.Local)
480 end = end.In(time.Local)
481 rootPath := filepath.Join(ms.root, escapeFilename(network.GetName()))
482 root, err := os.Open(rootPath)
483 if err != nil {
484 return nil, err
485 }
486
487 // The returned targets are escaped, and there is no way to un-escape
488 // TODO: switch to ReadDir (Go 1.16+)
489 targetNames, err := root.Readdirnames(0)
490 root.Close()
491 if err != nil {
492 return nil, err
493 }
494
495 var targets []chatHistoryTarget
496 for _, target := range targetNames {
497 // target is already escaped here
498 targetPath := filepath.Join(rootPath, target)
499 targetDir, err := os.Open(targetPath)
500 if err != nil {
501 return nil, err
502 }
503
504 entries, err := targetDir.Readdir(0)
505 targetDir.Close()
506 if err != nil {
507 return nil, err
508 }
509
510 // We use mtime here, which may give imprecise or incorrect results
511 var t time.Time
512 for _, entry := range entries {
513 if entry.ModTime().After(t) {
514 t = entry.ModTime()
515 }
516 }
517
518 // The timestamps we get from logs have second granularity
519 t = truncateSecond(t)
520
521 // Filter out targets that don't fullfil the time bounds
522 if !isTimeBetween(t, start, end) {
523 continue
524 }
525
526 targets = append(targets, chatHistoryTarget{
527 Name: target,
528 LatestMessage: t,
529 })
530 }
531
532 // Sort targets by latest message time, backwards or forwards depending on
533 // the order of the time bounds
534 sort.Slice(targets, func(i, j int) bool {
535 t1, t2 := targets[i].LatestMessage, targets[j].LatestMessage
536 if start.Before(end) {
537 return t1.Before(t2)
538 } else {
539 return !t1.Before(t2)
540 }
541 })
542
543 // Truncate the result if necessary
544 if len(targets) > limit {
545 targets = targets[:limit]
546 }
547
548 return targets, nil
549}
550
551func truncateDay(t time.Time) time.Time {
552 year, month, day := t.Date()
553 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
554}
555
556func truncateSecond(t time.Time) time.Time {
557 year, month, day := t.Date()
558 return time.Date(year, month, day, t.Hour(), t.Minute(), t.Second(), 0, t.Location())
559}
560
561func isTimeBetween(t, start, end time.Time) bool {
562 if end.Before(start) {
563 end, start = start, end
564 }
565 return start.Before(t) && t.Before(end)
566}
Note: See TracBrowser for help on using the repository browser.