source: code/trunk/logger.go@ 407

Last change on this file since 407 was 407, checked in by contact, 5 years ago

Introduce internal message IDs

For now, these can be used as cursors in the logs. Future patches will
introduce functions that perform log queries with message IDs.

The IDs are state-less tokens containing all the required information to
refer to an on-disk log line: network name, entity name, date and byte
offset. The byte offset doesn't need to point to the first byte of the
line, any byte will do (note, this makes it so message IDs aren't
necessarily unique, we may want to change that in the future).

These internal message IDs are not exposed to clients because we don't
support upstream message IDs yet.

File size: 9.2 KB
Line 
1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15const messageLoggerMaxTries = 100
16
17type messageLogger struct {
18 network *network
19 entity string
20
21 path string
22 file *os.File
23}
24
25func newMessageLogger(network *network, entity string) *messageLogger {
26 return &messageLogger{
27 network: network,
28 entity: entity,
29 }
30}
31
32var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")
33
34func logPath(network *network, entity string, t time.Time) string {
35 user := network.user
36 srv := user.srv
37
38 year, month, day := t.Date()
39 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
40 return filepath.Join(srv.LogPath, escapeFilename.Replace(user.Username), escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)
41}
42
43func parseMsgID(s string) (network, entity string, t time.Time, offset int64, err error) {
44 var year, month, day int
45 _, err = fmt.Sscanf(s, "%s %s %04d-%02d-%02d %d", &network, &entity, &year, &month, &day, &offset)
46 if err != nil {
47 return "", "", time.Time{}, 0, fmt.Errorf("invalid message ID: %v", err)
48 }
49 t = time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.Local)
50 return network, entity, t, offset, nil
51}
52
53func formatMsgID(network, entity string, t time.Time, offset int64) string {
54 year, month, day := t.Date()
55 return fmt.Sprintf("%s %s %04d-%02d-%02d %d", network, entity, year, month, day, offset)
56}
57
58// nextMsgID queries the message ID for the next message to be written to f.
59func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
60 offset, err := f.Seek(0, io.SeekEnd)
61 if err != nil {
62 return "", err
63 }
64 return formatMsgID(network.GetName(), entity, t, offset), nil
65}
66
67// lastMsgID queries the last message ID for the given network, entity and
68// date. The message ID returned may not refer to a valid message, but can be
69// used in history queries.
70func lastMsgID(network *network, entity string, t time.Time) (string, error) {
71 p := logPath(network, entity, t)
72 fi, err := os.Stat(p)
73 if os.IsNotExist(err) {
74 return formatMsgID(network.GetName(), entity, t, -1), nil
75 } else if err != nil {
76 return "", err
77 }
78 return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil
79}
80
81func (ml *messageLogger) Append(msg *irc.Message) (string, error) {
82 s := formatMessage(msg)
83 if s == "" {
84 return "", nil
85 }
86
87 var t time.Time
88 if tag, ok := msg.Tags["time"]; ok {
89 var err error
90 t, err = time.Parse(serverTimeLayout, string(tag))
91 if err != nil {
92 return "", fmt.Errorf("failed to parse message time tag: %v", err)
93 }
94 t = t.In(time.Local)
95 } else {
96 t = time.Now()
97 }
98
99 // TODO: enforce maximum open file handles (LRU cache of file handles)
100 // TODO: handle non-monotonic clock behaviour
101 path := logPath(ml.network, ml.entity, t)
102 if ml.path != path {
103 if ml.file != nil {
104 ml.file.Close()
105 }
106
107 dir := filepath.Dir(path)
108 if err := os.MkdirAll(dir, 0700); err != nil {
109 return "", fmt.Errorf("failed to create logs directory %q: %v", dir, err)
110 }
111
112 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
113 if err != nil {
114 return "", fmt.Errorf("failed to open log file %q: %v", path, err)
115 }
116
117 ml.path = path
118 ml.file = f
119 }
120
121 msgID, err := nextMsgID(ml.network, ml.entity, t, ml.file)
122 if err != nil {
123 return "", fmt.Errorf("failed to generate message ID: %v", err)
124 }
125
126 _, err = fmt.Fprintf(ml.file, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
127 if err != nil {
128 return "", fmt.Errorf("failed to log message to %q: %v", ml.path, err)
129 }
130 return msgID, nil
131}
132
133func (ml *messageLogger) Close() error {
134 if ml.file == nil {
135 return nil
136 }
137 return ml.file.Close()
138}
139
140// formatMessage formats a message log line. It assumes a well-formed IRC
141// message.
142func formatMessage(msg *irc.Message) string {
143 switch strings.ToUpper(msg.Command) {
144 case "NICK":
145 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
146 case "JOIN":
147 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
148 case "PART":
149 var reason string
150 if len(msg.Params) > 1 {
151 reason = msg.Params[1]
152 }
153 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
154 case "KICK":
155 nick := msg.Params[1]
156 var reason string
157 if len(msg.Params) > 2 {
158 reason = msg.Params[2]
159 }
160 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
161 case "QUIT":
162 var reason string
163 if len(msg.Params) > 0 {
164 reason = msg.Params[0]
165 }
166 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
167 case "TOPIC":
168 var topic string
169 if len(msg.Params) > 1 {
170 topic = msg.Params[1]
171 }
172 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
173 case "MODE":
174 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
175 case "NOTICE":
176 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
177 case "PRIVMSG":
178 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
179 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
180 } else {
181 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
182 }
183 default:
184 return ""
185 }
186}
187
188func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
189 var hour, minute, second int
190 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
191 if err != nil {
192 return nil, time.Time{}, err
193 }
194 line = line[11:]
195
196 var cmd, sender, text string
197 if strings.HasPrefix(line, "<") {
198 cmd = "PRIVMSG"
199 parts := strings.SplitN(line[1:], "> ", 2)
200 if len(parts) != 2 {
201 return nil, time.Time{}, nil
202 }
203 sender, text = parts[0], parts[1]
204 } else if strings.HasPrefix(line, "-") {
205 cmd = "NOTICE"
206 parts := strings.SplitN(line[1:], "- ", 2)
207 if len(parts) != 2 {
208 return nil, time.Time{}, nil
209 }
210 sender, text = parts[0], parts[1]
211 } else if strings.HasPrefix(line, "* ") {
212 cmd = "PRIVMSG"
213 parts := strings.SplitN(line[2:], " ", 2)
214 if len(parts) != 2 {
215 return nil, time.Time{}, nil
216 }
217 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
218 } else {
219 return nil, time.Time{}, nil
220 }
221
222 year, month, day := ref.Date()
223 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
224
225 msg := &irc.Message{
226 Tags: map[string]irc.TagValue{
227 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
228 },
229 Prefix: &irc.Prefix{Name: sender},
230 Command: cmd,
231 Params: []string{entity, text},
232 }
233 return msg, t, nil
234}
235
236func parseMessagesBefore(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
237 path := logPath(network, entity, ref)
238 f, err := os.Open(path)
239 if err != nil {
240 if os.IsNotExist(err) {
241 return nil, nil
242 }
243 return nil, err
244 }
245 defer f.Close()
246
247 historyRing := make([]*irc.Message, limit)
248 cur := 0
249
250 sc := bufio.NewScanner(f)
251 for sc.Scan() {
252 msg, t, err := parseMessage(sc.Text(), entity, ref)
253 if err != nil {
254 return nil, err
255 } else if msg == nil {
256 continue
257 } else if !t.Before(ref) {
258 break
259 }
260
261 historyRing[cur%limit] = msg
262 cur++
263 }
264 if sc.Err() != nil {
265 return nil, sc.Err()
266 }
267
268 n := limit
269 if cur < limit {
270 n = cur
271 }
272 start := (cur - n + limit) % limit
273
274 if start+n <= limit { // ring doesnt wrap
275 return historyRing[start : start+n], nil
276 } else { // ring wraps
277 history := make([]*irc.Message, n)
278 r := copy(history, historyRing[start:])
279 copy(history[r:], historyRing[:n-r])
280 return history, nil
281 }
282}
283
284func parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
285 path := logPath(network, entity, ref)
286 f, err := os.Open(path)
287 if err != nil {
288 if os.IsNotExist(err) {
289 return nil, nil
290 }
291 return nil, err
292 }
293 defer f.Close()
294
295 var history []*irc.Message
296 sc := bufio.NewScanner(f)
297 for sc.Scan() && len(history) < limit {
298 msg, t, err := parseMessage(sc.Text(), entity, ref)
299 if err != nil {
300 return nil, err
301 } else if msg == nil || !t.After(ref) {
302 continue
303 }
304
305 history = append(history, msg)
306 }
307 if sc.Err() != nil {
308 return nil, sc.Err()
309 }
310
311 return history, nil
312}
313
314func loadHistoryBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
315 history := make([]*irc.Message, limit)
316 remaining := limit
317 tries := 0
318 for remaining > 0 && tries < messageLoggerMaxTries {
319 buf, err := parseMessagesBefore(network, entity, t, remaining)
320 if err != nil {
321 return nil, err
322 }
323 if len(buf) == 0 {
324 tries++
325 } else {
326 tries = 0
327 }
328 copy(history[remaining-len(buf):], buf)
329 remaining -= len(buf)
330 year, month, day := t.Date()
331 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
332 }
333
334 return history[remaining:], nil
335}
336
337func loadHistoryAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
338 var history []*irc.Message
339 remaining := limit
340 tries := 0
341 now := time.Now()
342 for remaining > 0 && tries < messageLoggerMaxTries && t.Before(now) {
343 buf, err := parseMessagesAfter(network, entity, t, remaining)
344 if err != nil {
345 return nil, err
346 }
347 if len(buf) == 0 {
348 tries++
349 } else {
350 tries = 0
351 }
352 history = append(history, buf...)
353 remaining -= len(buf)
354 year, month, day := t.Date()
355 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())
356 }
357 return history, nil
358}
Note: See TracBrowser for help on using the repository browser.