source: code/trunk/msgstore_fs.go@ 493

Last change on this file since 493 was 488, checked in by contact, 4 years ago

Use BARE for internal message IDs

This allows to have shorter and more future-proof IDs. This also
guarantees the IDs will only use reasonable ASCII characters (no
spaces), removing the need to encode them for PING/PONG tokens.

File size: 11.0 KB
RevLine 
[439]1package soju
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "os"
8 "path/filepath"
9 "strings"
10 "time"
11
[488]12 "git.sr.ht/~sircmpwn/go-bare"
[439]13 "gopkg.in/irc.v3"
14)
15
16const fsMessageStoreMaxTries = 100
17
18var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")
19
[488]20type date struct {
21 Year, Month, Day int
22}
23
24func newDate(t time.Time) date {
25 year, month, day := t.Date()
26 return date{year, int(month), day}
27}
28
29func (d date) Time() time.Time {
30 return time.Date(d.Year, time.Month(d.Month), d.Day, 0, 0, 0, 0, time.Local)
31}
32
33type fsMsgID struct {
34 Date date
35 Offset bare.Int
36}
37
38func (fsMsgID) msgIDType() msgIDType {
39 return msgIDFS
40}
41
42func parseFSMsgID(s string) (netID int64, entity string, t time.Time, offset int64, err error) {
43 var id fsMsgID
44 netID, entity, err = parseMsgID(s, &id)
45 if err != nil {
46 return 0, "", time.Time{}, 0, err
47 }
48 return netID, entity, id.Date.Time(), int64(id.Offset), nil
49}
50
51func formatFSMsgID(netID int64, entity string, t time.Time, offset int64) string {
52 id := fsMsgID{
53 Date: newDate(t),
54 Offset: bare.Int(offset),
55 }
56 return formatMsgID(netID, entity, &id)
57}
58
[439]59// fsMessageStore is a per-user on-disk store for IRC messages.
60type fsMessageStore struct {
61 root string
62
63 files map[string]*os.File // indexed by entity
64}
65
66func newFSMessageStore(root, username string) *fsMessageStore {
67 return &fsMessageStore{
68 root: filepath.Join(root, escapeFilename.Replace(username)),
69 files: make(map[string]*os.File),
70 }
71}
72
73func (ms *fsMessageStore) logPath(network *network, entity string, t time.Time) string {
74 year, month, day := t.Date()
75 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
76 return filepath.Join(ms.root, escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)
77}
78
79// nextMsgID queries the message ID for the next message to be written to f.
[440]80func nextFSMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
[439]81 offset, err := f.Seek(0, io.SeekEnd)
82 if err != nil {
83 return "", err
84 }
[440]85 return formatFSMsgID(network.ID, entity, t, offset), nil
[439]86}
87
88func (ms *fsMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
89 p := ms.logPath(network, entity, t)
90 fi, err := os.Stat(p)
91 if os.IsNotExist(err) {
[440]92 return formatFSMsgID(network.ID, entity, t, -1), nil
[439]93 } else if err != nil {
94 return "", err
95 }
[440]96 return formatFSMsgID(network.ID, entity, t, fi.Size()-1), nil
[439]97}
98
99func (ms *fsMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
100 s := formatMessage(msg)
101 if s == "" {
102 return "", nil
103 }
104
105 var t time.Time
106 if tag, ok := msg.Tags["time"]; ok {
107 var err error
108 t, err = time.Parse(serverTimeLayout, string(tag))
109 if err != nil {
110 return "", fmt.Errorf("failed to parse message time tag: %v", err)
111 }
112 t = t.In(time.Local)
113 } else {
114 t = time.Now()
115 }
116
117 // TODO: enforce maximum open file handles (LRU cache of file handles)
118 f := ms.files[entity]
119
120 // TODO: handle non-monotonic clock behaviour
121 path := ms.logPath(network, entity, t)
122 if f == nil || f.Name() != path {
123 if f != nil {
124 f.Close()
125 }
126
127 dir := filepath.Dir(path)
128 if err := os.MkdirAll(dir, 0700); err != nil {
129 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
130 }
131
132 var err error
133 f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
134 if err != nil {
135 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
136 }
137
138 ms.files[entity] = f
139 }
140
[440]141 msgID, err := nextFSMsgID(network, entity, t, f)
[439]142 if err != nil {
143 return "", fmt.Errorf("failed to generate message ID: %v", err)
144 }
145
146 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
147 if err != nil {
148 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
149 }
150
151 return msgID, nil
152}
153
154func (ms *fsMessageStore) Close() error {
155 var closeErr error
156 for _, f := range ms.files {
157 if err := f.Close(); err != nil {
158 closeErr = fmt.Errorf("failed to close message store: %v", err)
159 }
160 }
161 return closeErr
162}
163
164// formatMessage formats a message log line. It assumes a well-formed IRC
165// message.
166func formatMessage(msg *irc.Message) string {
167 switch strings.ToUpper(msg.Command) {
168 case "NICK":
169 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
170 case "JOIN":
171 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
172 case "PART":
173 var reason string
174 if len(msg.Params) > 1 {
175 reason = msg.Params[1]
176 }
177 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
178 case "KICK":
179 nick := msg.Params[1]
180 var reason string
181 if len(msg.Params) > 2 {
182 reason = msg.Params[2]
183 }
184 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
185 case "QUIT":
186 var reason string
187 if len(msg.Params) > 0 {
188 reason = msg.Params[0]
189 }
190 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
191 case "TOPIC":
192 var topic string
193 if len(msg.Params) > 1 {
194 topic = msg.Params[1]
195 }
196 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
197 case "MODE":
198 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
199 case "NOTICE":
200 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
201 case "PRIVMSG":
202 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
203 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
204 } else {
205 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
206 }
207 default:
208 return ""
209 }
210}
211
212func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
213 var hour, minute, second int
214 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
215 if err != nil {
216 return nil, time.Time{}, err
217 }
218 line = line[11:]
219
220 var cmd, sender, text string
221 if strings.HasPrefix(line, "<") {
222 cmd = "PRIVMSG"
223 parts := strings.SplitN(line[1:], "> ", 2)
224 if len(parts) != 2 {
225 return nil, time.Time{}, nil
226 }
227 sender, text = parts[0], parts[1]
228 } else if strings.HasPrefix(line, "-") {
229 cmd = "NOTICE"
230 parts := strings.SplitN(line[1:], "- ", 2)
231 if len(parts) != 2 {
232 return nil, time.Time{}, nil
233 }
234 sender, text = parts[0], parts[1]
235 } else if strings.HasPrefix(line, "* ") {
236 cmd = "PRIVMSG"
237 parts := strings.SplitN(line[2:], " ", 2)
238 if len(parts) != 2 {
239 return nil, time.Time{}, nil
240 }
241 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
242 } else {
243 return nil, time.Time{}, nil
244 }
245
246 year, month, day := ref.Date()
247 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
248
249 msg := &irc.Message{
250 Tags: map[string]irc.TagValue{
251 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
252 },
253 Prefix: &irc.Prefix{Name: sender},
254 Command: cmd,
255 Params: []string{entity, text},
256 }
257 return msg, t, nil
258}
259
260func (ms *fsMessageStore) parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
261 path := ms.logPath(network, entity, ref)
262 f, err := os.Open(path)
263 if err != nil {
264 if os.IsNotExist(err) {
265 return nil, nil
266 }
267 return nil, err
268 }
269 defer f.Close()
270
271 historyRing := make([]*irc.Message, limit)
272 cur := 0
273
274 sc := bufio.NewScanner(f)
275
276 if afterOffset >= 0 {
277 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
278 return nil, nil
279 }
280 sc.Scan() // skip till next newline
281 }
282
283 for sc.Scan() {
284 msg, t, err := parseMessage(sc.Text(), entity, ref)
285 if err != nil {
286 return nil, err
287 } else if msg == nil {
288 continue
289 } else if !t.Before(ref) {
290 break
291 }
292
293 historyRing[cur%limit] = msg
294 cur++
295 }
296 if sc.Err() != nil {
297 return nil, sc.Err()
298 }
299
300 n := limit
301 if cur < limit {
302 n = cur
303 }
304 start := (cur - n + limit) % limit
305
306 if start+n <= limit { // ring doesnt wrap
307 return historyRing[start : start+n], nil
308 } else { // ring wraps
309 history := make([]*irc.Message, n)
310 r := copy(history, historyRing[start:])
311 copy(history[r:], historyRing[:n-r])
312 return history, nil
313 }
314}
315
316func (ms *fsMessageStore) parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
317 path := ms.logPath(network, entity, ref)
318 f, err := os.Open(path)
319 if err != nil {
320 if os.IsNotExist(err) {
321 return nil, nil
322 }
323 return nil, err
324 }
325 defer f.Close()
326
327 var history []*irc.Message
328 sc := bufio.NewScanner(f)
329 for sc.Scan() && len(history) < limit {
330 msg, t, err := parseMessage(sc.Text(), entity, ref)
331 if err != nil {
332 return nil, err
333 } else if msg == nil || !t.After(ref) {
334 continue
335 }
336
337 history = append(history, msg)
338 }
339 if sc.Err() != nil {
340 return nil, sc.Err()
341 }
342
343 return history, nil
344}
345
346func (ms *fsMessageStore) LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
347 history := make([]*irc.Message, limit)
348 remaining := limit
349 tries := 0
350 for remaining > 0 && tries < fsMessageStoreMaxTries {
351 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, -1)
352 if err != nil {
353 return nil, err
354 }
355 if len(buf) == 0 {
356 tries++
357 } else {
358 tries = 0
359 }
360 copy(history[remaining-len(buf):], buf)
361 remaining -= len(buf)
362 year, month, day := t.Date()
363 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
364 }
365
366 return history[remaining:], nil
367}
368
369func (ms *fsMessageStore) LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
370 var history []*irc.Message
371 remaining := limit
372 tries := 0
373 now := time.Now()
374 for remaining > 0 && tries < fsMessageStoreMaxTries && t.Before(now) {
375 buf, err := ms.parseMessagesAfter(network, entity, t, remaining)
376 if err != nil {
377 return nil, err
378 }
379 if len(buf) == 0 {
380 tries++
381 } else {
382 tries = 0
383 }
384 history = append(history, buf...)
385 remaining -= len(buf)
386 year, month, day := t.Date()
387 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())
388 }
389 return history, nil
390}
391
392func truncateDay(t time.Time) time.Time {
393 year, month, day := t.Date()
394 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
395}
396
397func (ms *fsMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
398 var afterTime time.Time
399 var afterOffset int64
400 if id != "" {
[440]401 var idNet int64
402 var idEntity string
[439]403 var err error
[440]404 idNet, idEntity, afterTime, afterOffset, err = parseFSMsgID(id)
[439]405 if err != nil {
406 return nil, err
407 }
[440]408 if idNet != network.ID || idEntity != entity {
[439]409 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
410 }
411 }
412
413 history := make([]*irc.Message, limit)
414 t := time.Now()
415 remaining := limit
416 tries := 0
417 for remaining > 0 && tries < fsMessageStoreMaxTries && !truncateDay(t).Before(afterTime) {
418 var offset int64 = -1
419 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
420 offset = afterOffset
421 }
422
423 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, offset)
424 if err != nil {
425 return nil, err
426 }
427 if len(buf) == 0 {
428 tries++
429 } else {
430 tries = 0
431 }
432 copy(history[remaining-len(buf):], buf)
433 remaining -= len(buf)
434 year, month, day := t.Date()
435 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
436 }
437
438 return history[remaining:], nil
439}
Note: See TracBrowser for help on using the repository browser.