source: code/trunk/logger.go@ 414

Last change on this file since 414 was 408, checked in by contact, 5 years ago

Introduce loadHistoryLatestID

This loads latest messages from logs up to a given message ID. This is
similar to the IRCv3 CHATHISTORY LATEST command [1].

[1]: https://github.com/ircv3/ircv3-specifications/blob/0c271a5f1df4f93b5ae4c7597422821c40a8cfeb/extensions/chathistory.md#latest

File size: 10.6 KB
RevLine 
[215]1package soju
2
3import (
[319]4 "bufio"
[215]5 "fmt"
[407]6 "io"
[215]7 "os"
8 "path/filepath"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
[387]15const messageLoggerMaxTries = 100
16
[215]17type messageLogger struct {
[248]18 network *network
19 entity string
[215]20
[247]21 path string
22 file *os.File
[215]23}
24
[248]25func newMessageLogger(network *network, entity string) *messageLogger {
[215]26 return &messageLogger{
[248]27 network: network,
28 entity: entity,
[215]29 }
30}
31
[397]32var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")
33
[247]34func logPath(network *network, entity string, t time.Time) string {
35 user := network.user
36 srv := user.srv
37
38 year, month, day := t.Date()
39 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
[397]40 return filepath.Join(srv.LogPath, escapeFilename.Replace(user.Username), escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)
[247]41}
42
[407]43func parseMsgID(s string) (network, entity string, t time.Time, offset int64, err error) {
44 var year, month, day int
45 _, err = fmt.Sscanf(s, "%s %s %04d-%02d-%02d %d", &network, &entity, &year, &month, &day, &offset)
46 if err != nil {
47 return "", "", time.Time{}, 0, fmt.Errorf("invalid message ID: %v", err)
48 }
49 t = time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.Local)
50 return network, entity, t, offset, nil
51}
52
53func formatMsgID(network, entity string, t time.Time, offset int64) string {
54 year, month, day := t.Date()
55 return fmt.Sprintf("%s %s %04d-%02d-%02d %d", network, entity, year, month, day, offset)
56}
57
58// nextMsgID queries the message ID for the next message to be written to f.
59func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
60 offset, err := f.Seek(0, io.SeekEnd)
61 if err != nil {
62 return "", err
63 }
64 return formatMsgID(network.GetName(), entity, t, offset), nil
65}
66
67// lastMsgID queries the last message ID for the given network, entity and
68// date. The message ID returned may not refer to a valid message, but can be
69// used in history queries.
70func lastMsgID(network *network, entity string, t time.Time) (string, error) {
71 p := logPath(network, entity, t)
72 fi, err := os.Stat(p)
73 if os.IsNotExist(err) {
74 return formatMsgID(network.GetName(), entity, t, -1), nil
75 } else if err != nil {
76 return "", err
77 }
78 return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil
79}
80
81func (ml *messageLogger) Append(msg *irc.Message) (string, error) {
[215]82 s := formatMessage(msg)
83 if s == "" {
[407]84 return "", nil
[215]85 }
86
[250]87 var t time.Time
88 if tag, ok := msg.Tags["time"]; ok {
89 var err error
90 t, err = time.Parse(serverTimeLayout, string(tag))
91 if err != nil {
[407]92 return "", fmt.Errorf("failed to parse message time tag: %v", err)
[250]93 }
94 t = t.In(time.Local)
95 } else {
96 t = time.Now()
97 }
[216]98
[215]99 // TODO: enforce maximum open file handles (LRU cache of file handles)
100 // TODO: handle non-monotonic clock behaviour
[250]101 path := logPath(ml.network, ml.entity, t)
[247]102 if ml.path != path {
[215]103 if ml.file != nil {
104 ml.file.Close()
105 }
106
[247]107 dir := filepath.Dir(path)
[215]108 if err := os.MkdirAll(dir, 0700); err != nil {
[407]109 return "", fmt.Errorf("failed to create logs directory %q: %v", dir, err)
[215]110 }
111
112 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
113 if err != nil {
[407]114 return "", fmt.Errorf("failed to open log file %q: %v", path, err)
[215]115 }
116
[247]117 ml.path = path
[215]118 ml.file = f
119 }
120
[407]121 msgID, err := nextMsgID(ml.network, ml.entity, t, ml.file)
[215]122 if err != nil {
[407]123 return "", fmt.Errorf("failed to generate message ID: %v", err)
[215]124 }
[407]125
126 _, err = fmt.Fprintf(ml.file, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
127 if err != nil {
128 return "", fmt.Errorf("failed to log message to %q: %v", ml.path, err)
129 }
130 return msgID, nil
[215]131}
132
133func (ml *messageLogger) Close() error {
134 if ml.file == nil {
135 return nil
136 }
137 return ml.file.Close()
138}
139
140// formatMessage formats a message log line. It assumes a well-formed IRC
141// message.
142func formatMessage(msg *irc.Message) string {
143 switch strings.ToUpper(msg.Command) {
144 case "NICK":
145 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
146 case "JOIN":
147 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
148 case "PART":
149 var reason string
150 if len(msg.Params) > 1 {
151 reason = msg.Params[1]
152 }
153 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
154 case "KICK":
155 nick := msg.Params[1]
156 var reason string
157 if len(msg.Params) > 2 {
158 reason = msg.Params[2]
159 }
160 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
161 case "QUIT":
162 var reason string
163 if len(msg.Params) > 0 {
164 reason = msg.Params[0]
165 }
166 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
[235]167 case "TOPIC":
168 var topic string
169 if len(msg.Params) > 1 {
170 topic = msg.Params[1]
171 }
172 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
[215]173 case "MODE":
174 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
[234]175 case "NOTICE":
176 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
177 case "PRIVMSG":
[392]178 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
179 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
180 } else {
181 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
182 }
[215]183 default:
184 return ""
185 }
186}
[319]187
[360]188func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
189 var hour, minute, second int
190 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
191 if err != nil {
192 return nil, time.Time{}, err
193 }
194 line = line[11:]
195
[392]196 var cmd, sender, text string
[391]197 if strings.HasPrefix(line, "<") {
198 cmd = "PRIVMSG"
[392]199 parts := strings.SplitN(line[1:], "> ", 2)
200 if len(parts) != 2 {
201 return nil, time.Time{}, nil
202 }
203 sender, text = parts[0], parts[1]
[391]204 } else if strings.HasPrefix(line, "-") {
205 cmd = "NOTICE"
[392]206 parts := strings.SplitN(line[1:], "- ", 2)
207 if len(parts) != 2 {
208 return nil, time.Time{}, nil
209 }
210 sender, text = parts[0], parts[1]
211 } else if strings.HasPrefix(line, "* ") {
212 cmd = "PRIVMSG"
213 parts := strings.SplitN(line[2:], " ", 2)
214 if len(parts) != 2 {
215 return nil, time.Time{}, nil
216 }
217 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
[391]218 } else {
[360]219 return nil, time.Time{}, nil
220 }
[391]221
[360]222 year, month, day := ref.Date()
223 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
224
225 msg := &irc.Message{
226 Tags: map[string]irc.TagValue{
227 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
228 },
[362]229 Prefix: &irc.Prefix{Name: sender},
[391]230 Command: cmd,
[360]231 Params: []string{entity, text},
232 }
233 return msg, t, nil
234}
235
[408]236func parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
[360]237 path := logPath(network, entity, ref)
[319]238 f, err := os.Open(path)
239 if err != nil {
240 if os.IsNotExist(err) {
241 return nil, nil
242 }
243 return nil, err
244 }
245 defer f.Close()
246
247 historyRing := make([]*irc.Message, limit)
248 cur := 0
249
250 sc := bufio.NewScanner(f)
[408]251
252 if afterOffset >= 0 {
253 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
254 return nil, nil
255 }
256 sc.Scan() // skip till next newline
257 }
258
[319]259 for sc.Scan() {
[360]260 msg, t, err := parseMessage(sc.Text(), entity, ref)
[319]261 if err != nil {
262 return nil, err
[360]263 } else if msg == nil {
[319]264 continue
[360]265 } else if !t.Before(ref) {
[319]266 break
267 }
268
[360]269 historyRing[cur%limit] = msg
[319]270 cur++
271 }
272 if sc.Err() != nil {
273 return nil, sc.Err()
274 }
275
276 n := limit
277 if cur < limit {
278 n = cur
279 }
280 start := (cur - n + limit) % limit
281
282 if start+n <= limit { // ring doesnt wrap
283 return historyRing[start : start+n], nil
284 } else { // ring wraps
285 history := make([]*irc.Message, n)
286 r := copy(history, historyRing[start:])
287 copy(history[r:], historyRing[:n-r])
288 return history, nil
289 }
290}
[360]291
292func parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
293 path := logPath(network, entity, ref)
294 f, err := os.Open(path)
295 if err != nil {
296 if os.IsNotExist(err) {
297 return nil, nil
298 }
299 return nil, err
300 }
301 defer f.Close()
302
303 var history []*irc.Message
304 sc := bufio.NewScanner(f)
305 for sc.Scan() && len(history) < limit {
306 msg, t, err := parseMessage(sc.Text(), entity, ref)
307 if err != nil {
308 return nil, err
309 } else if msg == nil || !t.After(ref) {
310 continue
311 }
312
313 history = append(history, msg)
314 }
315 if sc.Err() != nil {
316 return nil, sc.Err()
317 }
318
319 return history, nil
320}
[387]321
322func loadHistoryBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
323 history := make([]*irc.Message, limit)
324 remaining := limit
325 tries := 0
326 for remaining > 0 && tries < messageLoggerMaxTries {
[408]327 buf, err := parseMessagesBefore(network, entity, t, remaining, -1)
[387]328 if err != nil {
329 return nil, err
330 }
331 if len(buf) == 0 {
332 tries++
333 } else {
334 tries = 0
335 }
336 copy(history[remaining-len(buf):], buf)
337 remaining -= len(buf)
338 year, month, day := t.Date()
339 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
340 }
341
342 return history[remaining:], nil
343}
344
345func loadHistoryAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
346 var history []*irc.Message
347 remaining := limit
348 tries := 0
349 now := time.Now()
350 for remaining > 0 && tries < messageLoggerMaxTries && t.Before(now) {
351 buf, err := parseMessagesAfter(network, entity, t, remaining)
352 if err != nil {
353 return nil, err
354 }
355 if len(buf) == 0 {
356 tries++
357 } else {
358 tries = 0
359 }
360 history = append(history, buf...)
361 remaining -= len(buf)
362 year, month, day := t.Date()
363 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())
364 }
365 return history, nil
366}
[408]367
368func truncateDay(t time.Time) time.Time {
369 year, month, day := t.Date()
370 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
371}
372
373func loadHistoryLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
374 var afterTime time.Time
375 var afterOffset int64
376 if id != "" {
377 var idNet, idEntity string
378 var err error
379 idNet, idEntity, afterTime, afterOffset, err = parseMsgID(id)
380 if err != nil {
381 return nil, err
382 }
383 if idNet != network.GetName() || idEntity != entity {
384 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
385 }
386 }
387
388 history := make([]*irc.Message, limit)
389 t := time.Now()
390 remaining := limit
391 tries := 0
392 for remaining > 0 && tries < messageLoggerMaxTries && !truncateDay(t).Before(afterTime) {
393 var offset int64 = -1
394 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
395 offset = afterOffset
396 }
397
398 buf, err := parseMessagesBefore(network, entity, t, remaining, offset)
399 if err != nil {
400 return nil, err
401 }
402 if len(buf) == 0 {
403 tries++
404 } else {
405 tries = 0
406 }
407 copy(history[remaining-len(buf):], buf)
408 remaining -= len(buf)
409 year, month, day := t.Date()
410 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
411 }
412
413 return history[remaining:], nil
414}
Note: See TracBrowser for help on using the repository browser.