source: code/trunk/logger.go@ 413

Last change on this file since 413 was 408, checked in by contact, 5 years ago

Introduce loadHistoryLatestID

This loads latest messages from logs up to a given message ID. This is
similar to the IRCv3 CHATHISTORY LATEST command [1].

[1]: https://github.com/ircv3/ircv3-specifications/blob/0c271a5f1df4f93b5ae4c7597422821c40a8cfeb/extensions/chathistory.md#latest

File size: 10.6 KB
Line 
1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15const messageLoggerMaxTries = 100
16
17type messageLogger struct {
18 network *network
19 entity string
20
21 path string
22 file *os.File
23}
24
25func newMessageLogger(network *network, entity string) *messageLogger {
26 return &messageLogger{
27 network: network,
28 entity: entity,
29 }
30}
31
32var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")
33
34func logPath(network *network, entity string, t time.Time) string {
35 user := network.user
36 srv := user.srv
37
38 year, month, day := t.Date()
39 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
40 return filepath.Join(srv.LogPath, escapeFilename.Replace(user.Username), escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)
41}
42
43func parseMsgID(s string) (network, entity string, t time.Time, offset int64, err error) {
44 var year, month, day int
45 _, err = fmt.Sscanf(s, "%s %s %04d-%02d-%02d %d", &network, &entity, &year, &month, &day, &offset)
46 if err != nil {
47 return "", "", time.Time{}, 0, fmt.Errorf("invalid message ID: %v", err)
48 }
49 t = time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.Local)
50 return network, entity, t, offset, nil
51}
52
53func formatMsgID(network, entity string, t time.Time, offset int64) string {
54 year, month, day := t.Date()
55 return fmt.Sprintf("%s %s %04d-%02d-%02d %d", network, entity, year, month, day, offset)
56}
57
58// nextMsgID queries the message ID for the next message to be written to f.
59func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
60 offset, err := f.Seek(0, io.SeekEnd)
61 if err != nil {
62 return "", err
63 }
64 return formatMsgID(network.GetName(), entity, t, offset), nil
65}
66
67// lastMsgID queries the last message ID for the given network, entity and
68// date. The message ID returned may not refer to a valid message, but can be
69// used in history queries.
70func lastMsgID(network *network, entity string, t time.Time) (string, error) {
71 p := logPath(network, entity, t)
72 fi, err := os.Stat(p)
73 if os.IsNotExist(err) {
74 return formatMsgID(network.GetName(), entity, t, -1), nil
75 } else if err != nil {
76 return "", err
77 }
78 return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil
79}
80
81func (ml *messageLogger) Append(msg *irc.Message) (string, error) {
82 s := formatMessage(msg)
83 if s == "" {
84 return "", nil
85 }
86
87 var t time.Time
88 if tag, ok := msg.Tags["time"]; ok {
89 var err error
90 t, err = time.Parse(serverTimeLayout, string(tag))
91 if err != nil {
92 return "", fmt.Errorf("failed to parse message time tag: %v", err)
93 }
94 t = t.In(time.Local)
95 } else {
96 t = time.Now()
97 }
98
99 // TODO: enforce maximum open file handles (LRU cache of file handles)
100 // TODO: handle non-monotonic clock behaviour
101 path := logPath(ml.network, ml.entity, t)
102 if ml.path != path {
103 if ml.file != nil {
104 ml.file.Close()
105 }
106
107 dir := filepath.Dir(path)
108 if err := os.MkdirAll(dir, 0700); err != nil {
109 return "", fmt.Errorf("failed to create logs directory %q: %v", dir, err)
110 }
111
112 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
113 if err != nil {
114 return "", fmt.Errorf("failed to open log file %q: %v", path, err)
115 }
116
117 ml.path = path
118 ml.file = f
119 }
120
121 msgID, err := nextMsgID(ml.network, ml.entity, t, ml.file)
122 if err != nil {
123 return "", fmt.Errorf("failed to generate message ID: %v", err)
124 }
125
126 _, err = fmt.Fprintf(ml.file, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
127 if err != nil {
128 return "", fmt.Errorf("failed to log message to %q: %v", ml.path, err)
129 }
130 return msgID, nil
131}
132
133func (ml *messageLogger) Close() error {
134 if ml.file == nil {
135 return nil
136 }
137 return ml.file.Close()
138}
139
140// formatMessage formats a message log line. It assumes a well-formed IRC
141// message.
142func formatMessage(msg *irc.Message) string {
143 switch strings.ToUpper(msg.Command) {
144 case "NICK":
145 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
146 case "JOIN":
147 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
148 case "PART":
149 var reason string
150 if len(msg.Params) > 1 {
151 reason = msg.Params[1]
152 }
153 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
154 case "KICK":
155 nick := msg.Params[1]
156 var reason string
157 if len(msg.Params) > 2 {
158 reason = msg.Params[2]
159 }
160 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
161 case "QUIT":
162 var reason string
163 if len(msg.Params) > 0 {
164 reason = msg.Params[0]
165 }
166 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
167 case "TOPIC":
168 var topic string
169 if len(msg.Params) > 1 {
170 topic = msg.Params[1]
171 }
172 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
173 case "MODE":
174 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
175 case "NOTICE":
176 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
177 case "PRIVMSG":
178 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
179 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
180 } else {
181 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
182 }
183 default:
184 return ""
185 }
186}
187
188func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
189 var hour, minute, second int
190 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
191 if err != nil {
192 return nil, time.Time{}, err
193 }
194 line = line[11:]
195
196 var cmd, sender, text string
197 if strings.HasPrefix(line, "<") {
198 cmd = "PRIVMSG"
199 parts := strings.SplitN(line[1:], "> ", 2)
200 if len(parts) != 2 {
201 return nil, time.Time{}, nil
202 }
203 sender, text = parts[0], parts[1]
204 } else if strings.HasPrefix(line, "-") {
205 cmd = "NOTICE"
206 parts := strings.SplitN(line[1:], "- ", 2)
207 if len(parts) != 2 {
208 return nil, time.Time{}, nil
209 }
210 sender, text = parts[0], parts[1]
211 } else if strings.HasPrefix(line, "* ") {
212 cmd = "PRIVMSG"
213 parts := strings.SplitN(line[2:], " ", 2)
214 if len(parts) != 2 {
215 return nil, time.Time{}, nil
216 }
217 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
218 } else {
219 return nil, time.Time{}, nil
220 }
221
222 year, month, day := ref.Date()
223 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
224
225 msg := &irc.Message{
226 Tags: map[string]irc.TagValue{
227 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
228 },
229 Prefix: &irc.Prefix{Name: sender},
230 Command: cmd,
231 Params: []string{entity, text},
232 }
233 return msg, t, nil
234}
235
236func parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
237 path := logPath(network, entity, ref)
238 f, err := os.Open(path)
239 if err != nil {
240 if os.IsNotExist(err) {
241 return nil, nil
242 }
243 return nil, err
244 }
245 defer f.Close()
246
247 historyRing := make([]*irc.Message, limit)
248 cur := 0
249
250 sc := bufio.NewScanner(f)
251
252 if afterOffset >= 0 {
253 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
254 return nil, nil
255 }
256 sc.Scan() // skip till next newline
257 }
258
259 for sc.Scan() {
260 msg, t, err := parseMessage(sc.Text(), entity, ref)
261 if err != nil {
262 return nil, err
263 } else if msg == nil {
264 continue
265 } else if !t.Before(ref) {
266 break
267 }
268
269 historyRing[cur%limit] = msg
270 cur++
271 }
272 if sc.Err() != nil {
273 return nil, sc.Err()
274 }
275
276 n := limit
277 if cur < limit {
278 n = cur
279 }
280 start := (cur - n + limit) % limit
281
282 if start+n <= limit { // ring doesnt wrap
283 return historyRing[start : start+n], nil
284 } else { // ring wraps
285 history := make([]*irc.Message, n)
286 r := copy(history, historyRing[start:])
287 copy(history[r:], historyRing[:n-r])
288 return history, nil
289 }
290}
291
292func parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
293 path := logPath(network, entity, ref)
294 f, err := os.Open(path)
295 if err != nil {
296 if os.IsNotExist(err) {
297 return nil, nil
298 }
299 return nil, err
300 }
301 defer f.Close()
302
303 var history []*irc.Message
304 sc := bufio.NewScanner(f)
305 for sc.Scan() && len(history) < limit {
306 msg, t, err := parseMessage(sc.Text(), entity, ref)
307 if err != nil {
308 return nil, err
309 } else if msg == nil || !t.After(ref) {
310 continue
311 }
312
313 history = append(history, msg)
314 }
315 if sc.Err() != nil {
316 return nil, sc.Err()
317 }
318
319 return history, nil
320}
321
322func loadHistoryBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
323 history := make([]*irc.Message, limit)
324 remaining := limit
325 tries := 0
326 for remaining > 0 && tries < messageLoggerMaxTries {
327 buf, err := parseMessagesBefore(network, entity, t, remaining, -1)
328 if err != nil {
329 return nil, err
330 }
331 if len(buf) == 0 {
332 tries++
333 } else {
334 tries = 0
335 }
336 copy(history[remaining-len(buf):], buf)
337 remaining -= len(buf)
338 year, month, day := t.Date()
339 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
340 }
341
342 return history[remaining:], nil
343}
344
345func loadHistoryAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
346 var history []*irc.Message
347 remaining := limit
348 tries := 0
349 now := time.Now()
350 for remaining > 0 && tries < messageLoggerMaxTries && t.Before(now) {
351 buf, err := parseMessagesAfter(network, entity, t, remaining)
352 if err != nil {
353 return nil, err
354 }
355 if len(buf) == 0 {
356 tries++
357 } else {
358 tries = 0
359 }
360 history = append(history, buf...)
361 remaining -= len(buf)
362 year, month, day := t.Date()
363 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())
364 }
365 return history, nil
366}
367
368func truncateDay(t time.Time) time.Time {
369 year, month, day := t.Date()
370 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
371}
372
373func loadHistoryLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
374 var afterTime time.Time
375 var afterOffset int64
376 if id != "" {
377 var idNet, idEntity string
378 var err error
379 idNet, idEntity, afterTime, afterOffset, err = parseMsgID(id)
380 if err != nil {
381 return nil, err
382 }
383 if idNet != network.GetName() || idEntity != entity {
384 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
385 }
386 }
387
388 history := make([]*irc.Message, limit)
389 t := time.Now()
390 remaining := limit
391 tries := 0
392 for remaining > 0 && tries < messageLoggerMaxTries && !truncateDay(t).Before(afterTime) {
393 var offset int64 = -1
394 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
395 offset = afterOffset
396 }
397
398 buf, err := parseMessagesBefore(network, entity, t, remaining, offset)
399 if err != nil {
400 return nil, err
401 }
402 if len(buf) == 0 {
403 tries++
404 } else {
405 tries = 0
406 }
407 copy(history[remaining-len(buf):], buf)
408 remaining -= len(buf)
409 year, month, day := t.Date()
410 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
411 }
412
413 return history[remaining:], nil
414}
Note: See TracBrowser for help on using the repository browser.