source: code/trunk/msgstore_fs.go@ 658

Last change on this file since 658 was 644, checked in by contact, 4 years ago

msgstore_fs: rename log dir when network is renamed

File size: 15.1 KB
RevLine 
[439]1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
[549]9 "sort"
[439]10 "strings"
11 "time"
12
[488]13 "git.sr.ht/~sircmpwn/go-bare"
[439]14 "gopkg.in/irc.v3"
15)
16
[608]17const (
18 fsMessageStoreMaxFiles = 20
19 fsMessageStoreMaxTries = 100
20)
[439]21
[591]22func escapeFilename(unsafe string) (safe string) {
23 if unsafe == "." {
24 return "-"
25 } else if unsafe == ".." {
26 return "--"
27 } else {
28 return strings.NewReplacer("/", "-", "\\", "-").Replace(unsafe)
29 }
30}
[439]31
[488]32type date struct {
33 Year, Month, Day int
34}
35
36func newDate(t time.Time) date {
37 year, month, day := t.Date()
38 return date{year, int(month), day}
39}
40
41func (d date) Time() time.Time {
42 return time.Date(d.Year, time.Month(d.Month), d.Day, 0, 0, 0, 0, time.Local)
43}
44
45type fsMsgID struct {
46 Date date
47 Offset bare.Int
48}
49
50func (fsMsgID) msgIDType() msgIDType {
51 return msgIDFS
52}
53
54func parseFSMsgID(s string) (netID int64, entity string, t time.Time, offset int64, err error) {
55 var id fsMsgID
56 netID, entity, err = parseMsgID(s, &id)
57 if err != nil {
58 return 0, "", time.Time{}, 0, err
59 }
60 return netID, entity, id.Date.Time(), int64(id.Offset), nil
61}
62
63func formatFSMsgID(netID int64, entity string, t time.Time, offset int64) string {
64 id := fsMsgID{
65 Date: newDate(t),
66 Offset: bare.Int(offset),
67 }
68 return formatMsgID(netID, entity, &id)
69}
70
[608]71type fsMessageStoreFile struct {
72 *os.File
73 lastUse time.Time
74}
75
[439]76// fsMessageStore is a per-user on-disk store for IRC messages.
[642]77//
78// It mimicks the ZNC log layout and format. See the ZNC source:
79// https://github.com/znc/znc/blob/master/modules/log.cpp
[439]80type fsMessageStore struct {
81 root string
82
[608]83 // Write-only files used by Append
84 files map[string]*fsMessageStoreFile // indexed by entity
[439]85}
86
[517]87var _ messageStore = (*fsMessageStore)(nil)
88var _ chatHistoryMessageStore = (*fsMessageStore)(nil)
89
[439]90func newFSMessageStore(root, username string) *fsMessageStore {
91 return &fsMessageStore{
[591]92 root: filepath.Join(root, escapeFilename(username)),
[608]93 files: make(map[string]*fsMessageStoreFile),
[439]94 }
95}
96
97func (ms *fsMessageStore) logPath(network *network, entity string, t time.Time) string {
98 year, month, day := t.Date()
99 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
[591]100 return filepath.Join(ms.root, escapeFilename(network.GetName()), escapeFilename(entity), filename)
[439]101}
102
103// nextMsgID queries the message ID for the next message to be written to f.
[440]104func nextFSMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
[439]105 offset, err := f.Seek(0, io.SeekEnd)
106 if err != nil {
[515]107 return "", fmt.Errorf("failed to query next FS message ID: %v", err)
[439]108 }
[440]109 return formatFSMsgID(network.ID, entity, t, offset), nil
[439]110}
111
112func (ms *fsMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
113 p := ms.logPath(network, entity, t)
114 fi, err := os.Stat(p)
115 if os.IsNotExist(err) {
[440]116 return formatFSMsgID(network.ID, entity, t, -1), nil
[439]117 } else if err != nil {
[515]118 return "", fmt.Errorf("failed to query last FS message ID: %v", err)
[439]119 }
[440]120 return formatFSMsgID(network.ID, entity, t, fi.Size()-1), nil
[439]121}
122
123func (ms *fsMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
124 s := formatMessage(msg)
125 if s == "" {
126 return "", nil
127 }
128
129 var t time.Time
130 if tag, ok := msg.Tags["time"]; ok {
131 var err error
132 t, err = time.Parse(serverTimeLayout, string(tag))
133 if err != nil {
134 return "", fmt.Errorf("failed to parse message time tag: %v", err)
135 }
136 t = t.In(time.Local)
137 } else {
138 t = time.Now()
139 }
140
141 f := ms.files[entity]
142
143 // TODO: handle non-monotonic clock behaviour
144 path := ms.logPath(network, entity, t)
145 if f == nil || f.Name() != path {
146 dir := filepath.Dir(path)
[558]147 if err := os.MkdirAll(dir, 0750); err != nil {
[439]148 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
149 }
150
[608]151 ff, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0640)
[439]152 if err != nil {
153 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
154 }
155
[608]156 if f != nil {
157 f.Close()
158 }
159 f = &fsMessageStoreFile{File: ff}
[439]160 ms.files[entity] = f
161 }
162
[608]163 f.lastUse = time.Now()
164
165 if len(ms.files) > fsMessageStoreMaxFiles {
166 entities := make([]string, 0, len(ms.files))
167 for name := range ms.files {
168 entities = append(entities, name)
169 }
170 sort.Slice(entities, func(i, j int) bool {
171 a, b := entities[i], entities[j]
172 return ms.files[a].lastUse.Before(ms.files[b].lastUse)
173 })
174 entities = entities[0 : len(entities)-fsMessageStoreMaxFiles]
175 for _, name := range entities {
176 ms.files[name].Close()
177 delete(ms.files, name)
178 }
179 }
180
181 msgID, err := nextFSMsgID(network, entity, t, f.File)
[439]182 if err != nil {
183 return "", fmt.Errorf("failed to generate message ID: %v", err)
184 }
185
186 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
187 if err != nil {
188 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
189 }
190
191 return msgID, nil
192}
193
194func (ms *fsMessageStore) Close() error {
195 var closeErr error
196 for _, f := range ms.files {
197 if err := f.Close(); err != nil {
198 closeErr = fmt.Errorf("failed to close message store: %v", err)
199 }
200 }
201 return closeErr
202}
203
204// formatMessage formats a message log line. It assumes a well-formed IRC
205// message.
206func formatMessage(msg *irc.Message) string {
207 switch strings.ToUpper(msg.Command) {
208 case "NICK":
209 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
210 case "JOIN":
211 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
212 case "PART":
213 var reason string
214 if len(msg.Params) > 1 {
215 reason = msg.Params[1]
216 }
217 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
218 case "KICK":
219 nick := msg.Params[1]
220 var reason string
221 if len(msg.Params) > 2 {
222 reason = msg.Params[2]
223 }
224 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
225 case "QUIT":
226 var reason string
227 if len(msg.Params) > 0 {
228 reason = msg.Params[0]
229 }
230 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
231 case "TOPIC":
232 var topic string
233 if len(msg.Params) > 1 {
234 topic = msg.Params[1]
235 }
236 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
237 case "MODE":
238 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
239 case "NOTICE":
240 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
241 case "PRIVMSG":
242 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
243 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
244 } else {
245 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
246 }
247 default:
248 return ""
249 }
250}
251
252func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
253 var hour, minute, second int
254 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
255 if err != nil {
[515]256 return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: %v", err)
[439]257 }
258 line = line[11:]
259
260 var cmd, sender, text string
261 if strings.HasPrefix(line, "<") {
262 cmd = "PRIVMSG"
263 parts := strings.SplitN(line[1:], "> ", 2)
264 if len(parts) != 2 {
265 return nil, time.Time{}, nil
266 }
267 sender, text = parts[0], parts[1]
268 } else if strings.HasPrefix(line, "-") {
269 cmd = "NOTICE"
270 parts := strings.SplitN(line[1:], "- ", 2)
271 if len(parts) != 2 {
272 return nil, time.Time{}, nil
273 }
274 sender, text = parts[0], parts[1]
275 } else if strings.HasPrefix(line, "* ") {
276 cmd = "PRIVMSG"
277 parts := strings.SplitN(line[2:], " ", 2)
278 if len(parts) != 2 {
279 return nil, time.Time{}, nil
280 }
281 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
282 } else {
283 return nil, time.Time{}, nil
284 }
285
286 year, month, day := ref.Date()
287 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
288
289 msg := &irc.Message{
290 Tags: map[string]irc.TagValue{
291 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
292 },
293 Prefix: &irc.Prefix{Name: sender},
294 Command: cmd,
295 Params: []string{entity, text},
296 }
297 return msg, t, nil
298}
299
[516]300func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, end time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
[439]301 path := ms.logPath(network, entity, ref)
302 f, err := os.Open(path)
303 if err != nil {
304 if os.IsNotExist(err) {
305 return nil, nil
306 }
[515]307 return nil, fmt.Errorf("failed to parse messages before ref: %v", err)
[439]308 }
309 defer f.Close()
310
311 historyRing := make([]*irc.Message, limit)
312 cur := 0
313
314 sc := bufio.NewScanner(f)
315
316 if afterOffset >= 0 {
317 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
318 return nil, nil
319 }
320 sc.Scan() // skip till next newline
321 }
322
323 for sc.Scan() {
324 msg, t, err := parseMessage(sc.Text(), entity, ref)
325 if err != nil {
326 return nil, err
[516]327 } else if msg == nil || !t.After(end) {
[439]328 continue
329 } else if !t.Before(ref) {
330 break
331 }
332
333 historyRing[cur%limit] = msg
334 cur++
335 }
336 if sc.Err() != nil {
[515]337 return nil, fmt.Errorf("failed to parse messages before ref: scanner error: %v", sc.Err())
[439]338 }
339
340 n := limit
341 if cur < limit {
342 n = cur
343 }
344 start := (cur - n + limit) % limit
345
346 if start+n <= limit { // ring doesnt wrap
347 return historyRing[start : start+n], nil
348 } else { // ring wraps
349 history := make([]*irc.Message, n)
350 r := copy(history, historyRing[start:])
351 copy(history[r:], historyRing[:n-r])
352 return history, nil
353 }
354}
355
[516]356func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, end time.Time, limit int) ([]*irc.Message, error) {
[439]357 path := ms.logPath(network, entity, ref)
358 f, err := os.Open(path)
359 if err != nil {
360 if os.IsNotExist(err) {
361 return nil, nil
362 }
[515]363 return nil, fmt.Errorf("failed to parse messages after ref: %v", err)
[439]364 }
365 defer f.Close()
366
367 var history []*irc.Message
368 sc := bufio.NewScanner(f)
369 for sc.Scan() && len(history) < limit {
370 msg, t, err := parseMessage(sc.Text(), entity, ref)
371 if err != nil {
372 return nil, err
373 } else if msg == nil || !t.After(ref) {
374 continue
[516]375 } else if !t.Before(end) {
376 break
[439]377 }
378
379 history = append(history, msg)
380 }
381 if sc.Err() != nil {
[515]382 return nil, fmt.Errorf("failed to parse messages after ref: scanner error: %v", sc.Err())
[439]383 }
384
385 return history, nil
386}
387
[516]388func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) {
[610]389 start = start.In(time.Local)
390 end = end.In(time.Local)
[439]391 history := make([]*irc.Message, limit)
392 remaining := limit
393 tries := 0
[516]394 for remaining > 0 && tries < fsMessageStoreMaxTries && end.Before(start) {
395 buf, err := ms.parseMessagesBefore(network, entity, start, end, remaining, -1)
[439]396 if err != nil {
397 return nil, err
398 }
399 if len(buf) == 0 {
400 tries++
401 } else {
402 tries = 0
403 }
404 copy(history[remaining-len(buf):], buf)
405 remaining -= len(buf)
[516]406 year, month, day := start.Date()
407 start = time.Date(year, month, day, 0, 0, 0, 0, start.Location()).Add(-1)
[439]408 }
409
410 return history[remaining:], nil
411}
412
[516]413func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, start time.Time, end time.Time, limit int) ([]*irc.Message, error) {
[610]414 start = start.In(time.Local)
415 end = end.In(time.Local)
[439]416 var history []*irc.Message
417 remaining := limit
418 tries := 0
[516]419 for remaining > 0 && tries < fsMessageStoreMaxTries && start.Before(end) {
420 buf, err := ms.parseMessagesAfter(network, entity, start, end, remaining)
[439]421 if err != nil {
422 return nil, err
423 }
424 if len(buf) == 0 {
425 tries++
426 } else {
427 tries = 0
428 }
429 history = append(history, buf...)
430 remaining -= len(buf)
[516]431 year, month, day := start.Date()
432 start = time.Date(year, month, day+1, 0, 0, 0, 0, start.Location())
[439]433 }
434 return history, nil
435}
436
437func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
438 var afterTime time.Time
439 var afterOffset int64
440 if id != "" {
[440]441 var idNet int64
442 var idEntity string
[439]443 var err error
[440]444 idNet, idEntity, afterTime, afterOffset, err = parseFSMsgID(id)
[439]445 if err != nil {
446 return nil, err
447 }
[440]448 if idNet != network.ID || idEntity != entity {
[439]449 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
450 }
451 }
452
453 history := make([]*irc.Message, limit)
454 t := time.Now()
455 remaining := limit
456 tries := 0
457 for remaining > 0 && tries < fsMessageStoreMaxTries && !truncateDay(t).Before(afterTime) {
458 var offset int64 = -1
459 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
460 offset = afterOffset
461 }
462
[516]463 buf, err := ms.parseMessagesBefore(network, entity, t, time.Time{}, remaining, offset)
[439]464 if err != nil {
465 return nil, err
466 }
467 if len(buf) == 0 {
468 tries++
469 } else {
470 tries = 0
471 }
472 copy(history[remaining-len(buf):], buf)
473 remaining -= len(buf)
474 year, month, day := t.Date()
475 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
476 }
477
478 return history[remaining:], nil
479}
[549]480
481func (ms *fsMessageStore) ListTargets(network *network, start, end time.Time, limit int) ([]chatHistoryTarget, error) {
[610]482 start = start.In(time.Local)
483 end = end.In(time.Local)
[591]484 rootPath := filepath.Join(ms.root, escapeFilename(network.GetName()))
[549]485 root, err := os.Open(rootPath)
[628]486 if os.IsNotExist(err) {
487 return nil, nil
488 } else if err != nil {
[549]489 return nil, err
490 }
491
492 // The returned targets are escaped, and there is no way to un-escape
493 // TODO: switch to ReadDir (Go 1.16+)
494 targetNames, err := root.Readdirnames(0)
495 root.Close()
496 if err != nil {
497 return nil, err
498 }
499
500 var targets []chatHistoryTarget
501 for _, target := range targetNames {
502 // target is already escaped here
503 targetPath := filepath.Join(rootPath, target)
504 targetDir, err := os.Open(targetPath)
505 if err != nil {
506 return nil, err
507 }
508
509 entries, err := targetDir.Readdir(0)
510 targetDir.Close()
511 if err != nil {
512 return nil, err
513 }
514
515 // We use mtime here, which may give imprecise or incorrect results
516 var t time.Time
517 for _, entry := range entries {
518 if entry.ModTime().After(t) {
519 t = entry.ModTime()
520 }
521 }
522
523 // The timestamps we get from logs have second granularity
524 t = truncateSecond(t)
525
526 // Filter out targets that don't fullfil the time bounds
527 if !isTimeBetween(t, start, end) {
528 continue
529 }
530
531 targets = append(targets, chatHistoryTarget{
532 Name: target,
533 LatestMessage: t,
534 })
535 }
536
537 // Sort targets by latest message time, backwards or forwards depending on
538 // the order of the time bounds
539 sort.Slice(targets, func(i, j int) bool {
540 t1, t2 := targets[i].LatestMessage, targets[j].LatestMessage
541 if start.Before(end) {
542 return t1.Before(t2)
543 } else {
544 return !t1.Before(t2)
545 }
546 })
547
548 // Truncate the result if necessary
549 if len(targets) > limit {
550 targets = targets[:limit]
551 }
552
553 return targets, nil
554}
555
[644]556func (ms *fsMessageStore) RenameNetwork(oldNet, newNet *network) error {
557 oldDir := filepath.Join(ms.root, escapeFilename(oldNet.GetName()))
558 newDir := filepath.Join(ms.root, escapeFilename(newNet.GetName()))
559 // Avoid loosing data by overwriting an existing directory
560 if _, err := os.Stat(newDir); err == nil {
561 return fmt.Errorf("destination %q already exists", newDir)
562 }
563 return os.Rename(oldDir, newDir)
564}
565
[549]566func truncateDay(t time.Time) time.Time {
567 year, month, day := t.Date()
568 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
569}
570
571func truncateSecond(t time.Time) time.Time {
572 year, month, day := t.Date()
573 return time.Date(year, month, day, t.Hour(), t.Minute(), t.Second(), 0, t.Location())
574}
575
576func isTimeBetween(t, start, end time.Time) bool {
577 if end.Before(start) {
578 end, start = start, end
579 }
580 return start.Before(t) && t.Before(end)
581}
Note: See TracBrowser for help on using the repository browser.