Changeset 439 in code for trunk


Ignore:
Timestamp:
Jan 4, 2021, 1:24:00 PM (4 years ago)
Author:
contact
Message:

Turn messageStore into an interface

This allows for other implementations that aren't based on a filesystem.

Location:
trunk
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/msgstore.go

    r423 r439  
    22
    33import (
    4         "bufio"
    5         "fmt"
    6         "io"
    7         "os"
    8         "path/filepath"
    9         "strings"
    104        "time"
    115
     
    137)
    148
    15 const messageStoreMaxTries = 100
    16 
    17 var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")
    18 
    199// messageStore is a per-user store for IRC messages.
    20 type messageStore struct {
    21         root string
    22 
    23         files map[string]*os.File // indexed by entity
     10type messageStore interface {
     11        Close() error
     12        // LastMsgID queries the last message ID for the given network, entity and
     13        // date. The message ID returned may not refer to a valid message, but can be
     14        // used in history queries.
     15        LastMsgID(network *network, entity string, t time.Time) (string, error)
     16        LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error)
     17        LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error)
     18        LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error)
     19        Append(network *network, entity string, msg *irc.Message) (id string, err error)
    2420}
    25 
    26 func newMessageStore(root, username string) *messageStore {
    27         return &messageStore{
    28                 root:  filepath.Join(root, escapeFilename.Replace(username)),
    29                 files: make(map[string]*os.File),
    30         }
    31 }
    32 
    33 func (ms *messageStore) logPath(network *network, entity string, t time.Time) string {
    34         year, month, day := t.Date()
    35         filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
    36         return filepath.Join(ms.root, escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)
    37 }
    38 
    39 func parseMsgID(s string) (network, entity string, t time.Time, offset int64, err error) {
    40         var year, month, day int
    41         _, err = fmt.Sscanf(s, "%s %s %04d-%02d-%02d %d", &network, &entity, &year, &month, &day, &offset)
    42         if err != nil {
    43                 return "", "", time.Time{}, 0, fmt.Errorf("invalid message ID: %v", err)
    44         }
    45         t = time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.Local)
    46         return network, entity, t, offset, nil
    47 }
    48 
    49 func formatMsgID(network, entity string, t time.Time, offset int64) string {
    50         year, month, day := t.Date()
    51         return fmt.Sprintf("%s %s %04d-%02d-%02d %d", network, entity, year, month, day, offset)
    52 }
    53 
    54 // nextMsgID queries the message ID for the next message to be written to f.
    55 func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
    56         offset, err := f.Seek(0, io.SeekEnd)
    57         if err != nil {
    58                 return "", err
    59         }
    60         return formatMsgID(network.GetName(), entity, t, offset), nil
    61 }
    62 
    63 // LastMsgID queries the last message ID for the given network, entity and
    64 // date. The message ID returned may not refer to a valid message, but can be
    65 // used in history queries.
    66 func (ms *messageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
    67         p := ms.logPath(network, entity, t)
    68         fi, err := os.Stat(p)
    69         if os.IsNotExist(err) {
    70                 return formatMsgID(network.GetName(), entity, t, -1), nil
    71         } else if err != nil {
    72                 return "", err
    73         }
    74         return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil
    75 }
    76 
    77 func (ms *messageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
    78         s := formatMessage(msg)
    79         if s == "" {
    80                 return "", nil
    81         }
    82 
    83         var t time.Time
    84         if tag, ok := msg.Tags["time"]; ok {
    85                 var err error
    86                 t, err = time.Parse(serverTimeLayout, string(tag))
    87                 if err != nil {
    88                         return "", fmt.Errorf("failed to parse message time tag: %v", err)
    89                 }
    90                 t = t.In(time.Local)
    91         } else {
    92                 t = time.Now()
    93         }
    94 
    95         // TODO: enforce maximum open file handles (LRU cache of file handles)
    96         f := ms.files[entity]
    97 
    98         // TODO: handle non-monotonic clock behaviour
    99         path := ms.logPath(network, entity, t)
    100         if f == nil || f.Name() != path {
    101                 if f != nil {
    102                         f.Close()
    103                 }
    104 
    105                 dir := filepath.Dir(path)
    106                 if err := os.MkdirAll(dir, 0700); err != nil {
    107                         return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)
    108                 }
    109 
    110                 var err error
    111                 f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
    112                 if err != nil {
    113                         return "", fmt.Errorf("failed to open message log file %q: %v", path, err)
    114                 }
    115 
    116                 ms.files[entity] = f
    117         }
    118 
    119         msgID, err := nextMsgID(network, entity, t, f)
    120         if err != nil {
    121                 return "", fmt.Errorf("failed to generate message ID: %v", err)
    122         }
    123 
    124         _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)
    125         if err != nil {
    126                 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)
    127         }
    128 
    129         return msgID, nil
    130 }
    131 
    132 func (ms *messageStore) Close() error {
    133         var closeErr error
    134         for _, f := range ms.files {
    135                 if err := f.Close(); err != nil {
    136                         closeErr = fmt.Errorf("failed to close message store: %v", err)
    137                 }
    138         }
    139         return closeErr
    140 }
    141 
    142 // formatMessage formats a message log line. It assumes a well-formed IRC
    143 // message.
    144 func formatMessage(msg *irc.Message) string {
    145         switch strings.ToUpper(msg.Command) {
    146         case "NICK":
    147                 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])
    148         case "JOIN":
    149                 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)
    150         case "PART":
    151                 var reason string
    152                 if len(msg.Params) > 1 {
    153                         reason = msg.Params[1]
    154                 }
    155                 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
    156         case "KICK":
    157                 nick := msg.Params[1]
    158                 var reason string
    159                 if len(msg.Params) > 2 {
    160                         reason = msg.Params[2]
    161                 }
    162                 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)
    163         case "QUIT":
    164                 var reason string
    165                 if len(msg.Params) > 0 {
    166                         reason = msg.Params[0]
    167                 }
    168                 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)
    169         case "TOPIC":
    170                 var topic string
    171                 if len(msg.Params) > 1 {
    172                         topic = msg.Params[1]
    173                 }
    174                 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)
    175         case "MODE":
    176                 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))
    177         case "NOTICE":
    178                 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])
    179         case "PRIVMSG":
    180                 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {
    181                         return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)
    182                 } else {
    183                         return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
    184                 }
    185         default:
    186                 return ""
    187         }
    188 }
    189 
    190 func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {
    191         var hour, minute, second int
    192         _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
    193         if err != nil {
    194                 return nil, time.Time{}, err
    195         }
    196         line = line[11:]
    197 
    198         var cmd, sender, text string
    199         if strings.HasPrefix(line, "<") {
    200                 cmd = "PRIVMSG"
    201                 parts := strings.SplitN(line[1:], "> ", 2)
    202                 if len(parts) != 2 {
    203                         return nil, time.Time{}, nil
    204                 }
    205                 sender, text = parts[0], parts[1]
    206         } else if strings.HasPrefix(line, "-") {
    207                 cmd = "NOTICE"
    208                 parts := strings.SplitN(line[1:], "- ", 2)
    209                 if len(parts) != 2 {
    210                         return nil, time.Time{}, nil
    211                 }
    212                 sender, text = parts[0], parts[1]
    213         } else if strings.HasPrefix(line, "* ") {
    214                 cmd = "PRIVMSG"
    215                 parts := strings.SplitN(line[2:], " ", 2)
    216                 if len(parts) != 2 {
    217                         return nil, time.Time{}, nil
    218                 }
    219                 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"
    220         } else {
    221                 return nil, time.Time{}, nil
    222         }
    223 
    224         year, month, day := ref.Date()
    225         t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
    226 
    227         msg := &irc.Message{
    228                 Tags: map[string]irc.TagValue{
    229                         "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
    230                 },
    231                 Prefix:  &irc.Prefix{Name: sender},
    232                 Command: cmd,
    233                 Params:  []string{entity, text},
    234         }
    235         return msg, t, nil
    236 }
    237 
    238 func (ms *messageStore) parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
    239         path := ms.logPath(network, entity, ref)
    240         f, err := os.Open(path)
    241         if err != nil {
    242                 if os.IsNotExist(err) {
    243                         return nil, nil
    244                 }
    245                 return nil, err
    246         }
    247         defer f.Close()
    248 
    249         historyRing := make([]*irc.Message, limit)
    250         cur := 0
    251 
    252         sc := bufio.NewScanner(f)
    253 
    254         if afterOffset >= 0 {
    255                 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {
    256                         return nil, nil
    257                 }
    258                 sc.Scan() // skip till next newline
    259         }
    260 
    261         for sc.Scan() {
    262                 msg, t, err := parseMessage(sc.Text(), entity, ref)
    263                 if err != nil {
    264                         return nil, err
    265                 } else if msg == nil {
    266                         continue
    267                 } else if !t.Before(ref) {
    268                         break
    269                 }
    270 
    271                 historyRing[cur%limit] = msg
    272                 cur++
    273         }
    274         if sc.Err() != nil {
    275                 return nil, sc.Err()
    276         }
    277 
    278         n := limit
    279         if cur < limit {
    280                 n = cur
    281         }
    282         start := (cur - n + limit) % limit
    283 
    284         if start+n <= limit { // ring doesnt wrap
    285                 return historyRing[start : start+n], nil
    286         } else { // ring wraps
    287                 history := make([]*irc.Message, n)
    288                 r := copy(history, historyRing[start:])
    289                 copy(history[r:], historyRing[:n-r])
    290                 return history, nil
    291         }
    292 }
    293 
    294 func (ms *messageStore) parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
    295         path := ms.logPath(network, entity, ref)
    296         f, err := os.Open(path)
    297         if err != nil {
    298                 if os.IsNotExist(err) {
    299                         return nil, nil
    300                 }
    301                 return nil, err
    302         }
    303         defer f.Close()
    304 
    305         var history []*irc.Message
    306         sc := bufio.NewScanner(f)
    307         for sc.Scan() && len(history) < limit {
    308                 msg, t, err := parseMessage(sc.Text(), entity, ref)
    309                 if err != nil {
    310                         return nil, err
    311                 } else if msg == nil || !t.After(ref) {
    312                         continue
    313                 }
    314 
    315                 history = append(history, msg)
    316         }
    317         if sc.Err() != nil {
    318                 return nil, sc.Err()
    319         }
    320 
    321         return history, nil
    322 }
    323 
    324 func (ms *messageStore) LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
    325         history := make([]*irc.Message, limit)
    326         remaining := limit
    327         tries := 0
    328         for remaining > 0 && tries < messageStoreMaxTries {
    329                 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, -1)
    330                 if err != nil {
    331                         return nil, err
    332                 }
    333                 if len(buf) == 0 {
    334                         tries++
    335                 } else {
    336                         tries = 0
    337                 }
    338                 copy(history[remaining-len(buf):], buf)
    339                 remaining -= len(buf)
    340                 year, month, day := t.Date()
    341                 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
    342         }
    343 
    344         return history[remaining:], nil
    345 }
    346 
    347 func (ms *messageStore) LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
    348         var history []*irc.Message
    349         remaining := limit
    350         tries := 0
    351         now := time.Now()
    352         for remaining > 0 && tries < messageStoreMaxTries && t.Before(now) {
    353                 buf, err := ms.parseMessagesAfter(network, entity, t, remaining)
    354                 if err != nil {
    355                         return nil, err
    356                 }
    357                 if len(buf) == 0 {
    358                         tries++
    359                 } else {
    360                         tries = 0
    361                 }
    362                 history = append(history, buf...)
    363                 remaining -= len(buf)
    364                 year, month, day := t.Date()
    365                 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())
    366         }
    367         return history, nil
    368 }
    369 
    370 func truncateDay(t time.Time) time.Time {
    371         year, month, day := t.Date()
    372         return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
    373 }
    374 
    375 func (ms *messageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
    376         var afterTime time.Time
    377         var afterOffset int64
    378         if id != "" {
    379                 var idNet, idEntity string
    380                 var err error
    381                 idNet, idEntity, afterTime, afterOffset, err = parseMsgID(id)
    382                 if err != nil {
    383                         return nil, err
    384                 }
    385                 if idNet != network.GetName() || idEntity != entity {
    386                         return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")
    387                 }
    388         }
    389 
    390         history := make([]*irc.Message, limit)
    391         t := time.Now()
    392         remaining := limit
    393         tries := 0
    394         for remaining > 0 && tries < messageStoreMaxTries && !truncateDay(t).Before(afterTime) {
    395                 var offset int64 = -1
    396                 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
    397                         offset = afterOffset
    398                 }
    399 
    400                 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, offset)
    401                 if err != nil {
    402                         return nil, err
    403                 }
    404                 if len(buf) == 0 {
    405                         tries++
    406                 } else {
    407                         tries = 0
    408                 }
    409                 copy(history[remaining-len(buf):], buf)
    410                 remaining -= len(buf)
    411                 year, month, day := t.Date()
    412                 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)
    413         }
    414 
    415         return history[remaining:], nil
    416 }
  • trunk/user.go

    r437 r439  
    264264        networks        []*network
    265265        downstreamConns []*downstreamConn
    266         msgStore        *messageStore
     266        msgStore        messageStore
    267267
    268268        // LIST commands in progress
     
    277277
    278278func newUser(srv *Server, record *User) *user {
    279         var msgStore *messageStore
     279        var msgStore messageStore
    280280        if srv.LogPath != "" {
    281                 msgStore = newMessageStore(srv.LogPath, record.Username)
     281                msgStore = newFSMessageStore(srv.LogPath, record.Username)
    282282        }
    283283
Note: See TracChangeset for help on using the changeset viewer.