source: code/trunk/msgstore.go@ 429

Last change on this file since 429 was 423, checked in by contact, 5 years ago

Add message store abstraction

Introduce a messageStore type, which will allow for multiple
implementations (e.g. in the DB or in-memory instead of on-disk).

The message store is per-user so that we don't need to deal with locking
and it's easier to implement per-user limits.

File size: 10.9 KB
Line 
1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15const messageStoreMaxTries = 100
16
17var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")
18
19// messageStore is a per-user store for IRC messages.
20type messageStore struct {
21 root string
22
23 files map[string]*os.File // indexed by entity
24}
25
26func newMessageStore(root, username string) *messageStore {
27 return &messageStore{
28 root: filepath.Join(root, escapeFilename.Replace(username)),
29 files: make(map[string]*os.File),
30 }
31}
32
33func (ms *messageStore) logPath(network *network, entity string, t time.Time) string {
34 year, month, day := t.Date()
35 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
36 return filepath.Join(ms.root, escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)
37}
38
39func parseMsgID(s string) (network, entity string, t time.Time, offset int64, err error) {
40 var year, month, day int
41 _, err = fmt.Sscanf(s, "%s %s %04d-%02d-%02d %d", &network, &entity, &year, &month, &day, &offset)
42 if err != nil {
43 return "", "", time.Time{}, 0, fmt.Errorf("invalid message ID: %v", err)
44 }
45 t = time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.Local)
46 return network, entity, t, offset, nil
47}
48
49func formatMsgID(network, entity string, t time.Time, offset int64) string {
50 year, month, day := t.Date()
51 return fmt.Sprintf("%s %s %04d-%02d-%02d %d", network, entity, year, month, day, offset)
52}
53
54// nextMsgID queries the message ID for the next message to be written to f.
55func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
56 offset, err := f.Seek(0, io.SeekEnd)
57 if err != nil {
58 return "", err
59 }
60 return formatMsgID(network.GetName(), entity, t, offset), nil
61}
62
63// LastMsgID queries the last message ID for the given network, entity and
64// date. The message ID returned may not refer to a valid message, but can be
65// used in history queries.
66func (ms *messageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
67 p := ms.logPath(network, entity, t)
68 fi, err := os.Stat(p)
69 if os.IsNotExist(err) {
70 return formatMsgID(network.GetName(), entity, t, -1), nil
71 } else if err != nil {
72 return "", err
73 }
74 return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil
75}
76
77func (ms *messageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
78 s := formatMessage(msg)
79 if s == "" {
80 return "", nil
81 }
82
83 var t time.Time
84 if tag, ok := msg.Tags["time"]; ok {
85 var err error
86 t, err = time.Parse(serverTimeLayout, string(tag))
87 if err != nil {
88 return "", fmt.Errorf("failed to parse message time tag: %v", err)
89 }
90 t = t.In(time.Local)
91 } else {
92 t = time.Now()
93 }
94
95 // TODO: enforce maximum open file handles (LRU cache of file handles)
96 f := ms.files[entity]
97
98 // TODO: handle non-monotonic clock behaviour
99 path := ms.logPath(network, entity, t)
100 if f == nil || f.Name() != path {
101 if f != nil {
102 f.Close()
103 }
104
105 dir := filepath.Dir(path)
106 if err := os.MkdirAll(dir, 0700); err != nil {
107 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
108 }
109
110 var err error
111 f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
112 if err != nil {
113 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
114 }
115
116 ms.files[entity] = f
117 }
118
119 msgID, err := nextMsgID(network, entity, t, f)
120 if err != nil {
121 return "", fmt.Errorf("failed to generate message ID: %v", err)
122 }
123
124 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
125 if err != nil {
126 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
127 }
128
129 return msgID, nil
130}
131
132func (ms *messageStore) Close() error {
133 var closeErr error
134 for _, f := range ms.files {
135 if err := f.Close(); err != nil {
136 closeErr = fmt.Errorf("failed to close message store: %v", err)
137 }
138 }
139 return closeErr
140}
141
142// formatMessage formats a message log line. It assumes a well-formed IRC
143// message.
144func formatMessage(msg *irc.Message) string {
145 switch strings.ToUpper(msg.Command) {
146 case "NICK":
147 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
148 case "JOIN":
149 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
150 case "PART":
151 var reason string
152 if len(msg.Params) > 1 {
153 reason = msg.Params[1]
154 }
155 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
156 case "KICK":
157 nick := msg.Params[1]
158 var reason string
159 if len(msg.Params) > 2 {
160 reason = msg.Params[2]
161 }
162 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
163 case "QUIT":
164 var reason string
165 if len(msg.Params) > 0 {
166 reason = msg.Params[0]
167 }
168 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
169 case "TOPIC":
170 var topic string
171 if len(msg.Params) > 1 {
172 topic = msg.Params[1]
173 }
174 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
175 case "MODE":
176 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
177 case "NOTICE":
178 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
179 case "PRIVMSG":
180 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
181 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
182 } else {
183 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
184 }
185 default:
186 return ""
187 }
188}
189
190func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
191 var hour, minute, second int
192 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
193 if err != nil {
194 return nil, time.Time{}, err
195 }
196 line = line[11:]
197
198 var cmd, sender, text string
199 if strings.HasPrefix(line, "<") {
200 cmd = "PRIVMSG"
201 parts := strings.SplitN(line[1:], "> ", 2)
202 if len(parts) != 2 {
203 return nil, time.Time{}, nil
204 }
205 sender, text = parts[0], parts[1]
206 } else if strings.HasPrefix(line, "-") {
207 cmd = "NOTICE"
208 parts := strings.SplitN(line[1:], "- ", 2)
209 if len(parts) != 2 {
210 return nil, time.Time{}, nil
211 }
212 sender, text = parts[0], parts[1]
213 } else if strings.HasPrefix(line, "* ") {
214 cmd = "PRIVMSG"
215 parts := strings.SplitN(line[2:], " ", 2)
216 if len(parts) != 2 {
217 return nil, time.Time{}, nil
218 }
219 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
220 } else {
221 return nil, time.Time{}, nil
222 }
223
224 year, month, day := ref.Date()
225 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
226
227 msg := &irc.Message{
228 Tags: map[string]irc.TagValue{
229 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
230 },
231 Prefix: &irc.Prefix{Name: sender},
232 Command: cmd,
233 Params: []string{entity, text},
234 }
235 return msg, t, nil
236}
237
238func (ms *messageStore) parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
239 path := ms.logPath(network, entity, ref)
240 f, err := os.Open(path)
241 if err != nil {
242 if os.IsNotExist(err) {
243 return nil, nil
244 }
245 return nil, err
246 }
247 defer f.Close()
248
249 historyRing := make([]*irc.Message, limit)
250 cur := 0
251
252 sc := bufio.NewScanner(f)
253
254 if afterOffset >= 0 {
255 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
256 return nil, nil
257 }
258 sc.Scan() // skip till next newline
259 }
260
261 for sc.Scan() {
262 msg, t, err := parseMessage(sc.Text(), entity, ref)
263 if err != nil {
264 return nil, err
265 } else if msg == nil {
266 continue
267 } else if !t.Before(ref) {
268 break
269 }
270
271 historyRing[cur%limit] = msg
272 cur++
273 }
274 if sc.Err() != nil {
275 return nil, sc.Err()
276 }
277
278 n := limit
279 if cur < limit {
280 n = cur
281 }
282 start := (cur - n + limit) % limit
283
284 if start+n <= limit { // ring doesnt wrap
285 return historyRing[start : start+n], nil
286 } else { // ring wraps
287 history := make([]*irc.Message, n)
288 r := copy(history, historyRing[start:])
289 copy(history[r:], historyRing[:n-r])
290 return history, nil
291 }
292}
293
294func (ms *messageStore) parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
295 path := ms.logPath(network, entity, ref)
296 f, err := os.Open(path)
297 if err != nil {
298 if os.IsNotExist(err) {
299 return nil, nil
300 }
301 return nil, err
302 }
303 defer f.Close()
304
305 var history []*irc.Message
306 sc := bufio.NewScanner(f)
307 for sc.Scan() && len(history) < limit {
308 msg, t, err := parseMessage(sc.Text(), entity, ref)
309 if err != nil {
310 return nil, err
311 } else if msg == nil || !t.After(ref) {
312 continue
313 }
314
315 history = append(history, msg)
316 }
317 if sc.Err() != nil {
318 return nil, sc.Err()
319 }
320
321 return history, nil
322}
323
324func (ms *messageStore) LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
325 history := make([]*irc.Message, limit)
326 remaining := limit
327 tries := 0
328 for remaining > 0 && tries < messageStoreMaxTries {
329 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, -1)
330 if err != nil {
331 return nil, err
332 }
333 if len(buf) == 0 {
334 tries++
335 } else {
336 tries = 0
337 }
338 copy(history[remaining-len(buf):], buf)
339 remaining -= len(buf)
340 year, month, day := t.Date()
341 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
342 }
343
344 return history[remaining:], nil
345}
346
347func (ms *messageStore) LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
348 var history []*irc.Message
349 remaining := limit
350 tries := 0
351 now := time.Now()
352 for remaining > 0 && tries < messageStoreMaxTries && t.Before(now) {
353 buf, err := ms.parseMessagesAfter(network, entity, t, remaining)
354 if err != nil {
355 return nil, err
356 }
357 if len(buf) == 0 {
358 tries++
359 } else {
360 tries = 0
361 }
362 history = append(history, buf...)
363 remaining -= len(buf)
364 year, month, day := t.Date()
365 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())
366 }
367 return history, nil
368}
369
370func truncateDay(t time.Time) time.Time {
371 year, month, day := t.Date()
372 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
373}
374
375func (ms *messageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
376 var afterTime time.Time
377 var afterOffset int64
378 if id != "" {
379 var idNet, idEntity string
380 var err error
381 idNet, idEntity, afterTime, afterOffset, err = parseMsgID(id)
382 if err != nil {
383 return nil, err
384 }
385 if idNet != network.GetName() || idEntity != entity {
386 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
387 }
388 }
389
390 history := make([]*irc.Message, limit)
391 t := time.Now()
392 remaining := limit
393 tries := 0
394 for remaining > 0 && tries < messageStoreMaxTries && !truncateDay(t).Before(afterTime) {
395 var offset int64 = -1
396 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
397 offset = afterOffset
398 }
399
400 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, offset)
401 if err != nil {
402 return nil, err
403 }
404 if len(buf) == 0 {
405 tries++
406 } else {
407 tries = 0
408 }
409 copy(history[remaining-len(buf):], buf)
410 remaining -= len(buf)
411 year, month, day := t.Date()
412 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
413 }
414
415 return history[remaining:], nil
416}
Note: See TracBrowser for help on using the repository browser.