- Timestamp:
- Jan 4, 2021, 1:24:00 PM (4 years ago)
- Location:
- trunk
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/msgstore.go
r423 r439 2 2 3 3 import ( 4 "bufio"5 "fmt"6 "io"7 "os"8 "path/filepath"9 "strings"10 4 "time" 11 5 … … 13 7 ) 14 8 15 const messageStoreMaxTries = 10016 17 var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")18 19 9 // messageStore is a per-user store for IRC messages. 20 type messageStore struct { 21 root string 22 23 files map[string]*os.File // indexed by entity 10 type messageStore interface { 11 Close() error 12 // LastMsgID queries the last message ID for the given network, entity and 13 // date. The message ID returned may not refer to a valid message, but can be 14 // used in history queries. 15 LastMsgID(network *network, entity string, t time.Time) (string, error) 16 LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) 17 LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) 18 LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) 19 Append(network *network, entity string, msg *irc.Message) (id string, err error) 24 20 } 25 26 func newMessageStore(root, username string) *messageStore {27 return &messageStore{28 root: filepath.Join(root, escapeFilename.Replace(username)),29 files: make(map[string]*os.File),30 }31 }32 33 func (ms *messageStore) logPath(network *network, entity string, t time.Time) string {34 year, month, day := t.Date()35 filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)36 return filepath.Join(ms.root, escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)37 }38 39 func parseMsgID(s string) (network, entity string, t time.Time, offset int64, err error) {40 var year, month, day int41 _, err = fmt.Sscanf(s, "%s %s %04d-%02d-%02d %d", &network, &entity, &year, &month, &day, &offset)42 if err != nil {43 return "", "", time.Time{}, 0, fmt.Errorf("invalid message ID: %v", err)44 }45 t = time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.Local)46 return network, entity, t, offset, nil47 }48 49 func formatMsgID(network, entity string, t time.Time, offset int64) string {50 year, month, day := t.Date()51 return fmt.Sprintf("%s %s %04d-%02d-%02d %d", network, entity, year, month, day, offset)52 }53 54 // nextMsgID queries the message ID for the next message to be written to f.55 func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {56 offset, err := f.Seek(0, io.SeekEnd)57 if err != nil {58 return "", err59 }60 return formatMsgID(network.GetName(), entity, t, offset), nil61 }62 63 // LastMsgID queries the last message ID for the given network, entity and64 // date. The message ID returned may not refer to a valid message, but can be65 // used in history queries.66 func (ms *messageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {67 p := ms.logPath(network, entity, t)68 fi, err := os.Stat(p)69 if os.IsNotExist(err) {70 return formatMsgID(network.GetName(), entity, t, -1), nil71 } else if err != nil {72 return "", err73 }74 return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil75 }76 77 func (ms *messageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {78 s := formatMessage(msg)79 if s == "" {80 return "", nil81 }82 83 var t time.Time84 if tag, ok := msg.Tags["time"]; ok {85 var err error86 t, err = time.Parse(serverTimeLayout, string(tag))87 if err != nil {88 return "", fmt.Errorf("failed to parse message time tag: %v", err)89 }90 t = t.In(time.Local)91 } else {92 t = time.Now()93 }94 95 // TODO: enforce maximum open file handles (LRU cache of file handles)96 f := ms.files[entity]97 98 // TODO: handle non-monotonic clock behaviour99 path := ms.logPath(network, entity, t)100 if f == nil || f.Name() != path {101 if f != nil {102 f.Close()103 }104 105 dir := filepath.Dir(path)106 if err := os.MkdirAll(dir, 0700); err != nil {107 return "", fmt.Errorf("failed to create message logs directory %q: %v", dir, err)108 }109 110 var err error111 f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)112 if err != nil {113 return "", fmt.Errorf("failed to open message log file %q: %v", path, err)114 }115 116 ms.files[entity] = f117 }118 119 msgID, err := nextMsgID(network, entity, t, f)120 if err != nil {121 return "", fmt.Errorf("failed to generate message ID: %v", err)122 }123 124 _, err = fmt.Fprintf(f, "[%02d:%02d:%02d] %s\n", t.Hour(), t.Minute(), t.Second(), s)125 if err != nil {126 return "", fmt.Errorf("failed to log message to %q: %v", f.Name(), err)127 }128 129 return msgID, nil130 }131 132 func (ms *messageStore) Close() error {133 var closeErr error134 for _, f := range ms.files {135 if err := f.Close(); err != nil {136 closeErr = fmt.Errorf("failed to close message store: %v", err)137 }138 }139 return closeErr140 }141 142 // formatMessage formats a message log line. It assumes a well-formed IRC143 // message.144 func formatMessage(msg *irc.Message) string {145 switch strings.ToUpper(msg.Command) {146 case "NICK":147 return fmt.Sprintf("*** %s is now known as %s", msg.Prefix.Name, msg.Params[0])148 case "JOIN":149 return fmt.Sprintf("*** Joins: %s (%s@%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host)150 case "PART":151 var reason string152 if len(msg.Params) > 1 {153 reason = msg.Params[1]154 }155 return fmt.Sprintf("*** Parts: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)156 case "KICK":157 nick := msg.Params[1]158 var reason string159 if len(msg.Params) > 2 {160 reason = msg.Params[2]161 }162 return fmt.Sprintf("*** %s was kicked by %s (%s)", nick, msg.Prefix.Name, reason)163 case "QUIT":164 var reason string165 if len(msg.Params) > 0 {166 reason = msg.Params[0]167 }168 return fmt.Sprintf("*** Quits: %s (%s@%s) (%s)", msg.Prefix.Name, msg.Prefix.User, msg.Prefix.Host, reason)169 case "TOPIC":170 var topic string171 if len(msg.Params) > 1 {172 topic = msg.Params[1]173 }174 return fmt.Sprintf("*** %s changes topic to '%s'", msg.Prefix.Name, topic)175 case "MODE":176 return fmt.Sprintf("*** %s sets mode: %s", msg.Prefix.Name, strings.Join(msg.Params[1:], " "))177 case "NOTICE":178 return fmt.Sprintf("-%s- %s", msg.Prefix.Name, msg.Params[1])179 case "PRIVMSG":180 if cmd, params, ok := parseCTCPMessage(msg); ok && cmd == "ACTION" {181 return fmt.Sprintf("* %s %s", msg.Prefix.Name, params)182 } else {183 return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])184 }185 default:186 return ""187 }188 }189 190 func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, error) {191 var hour, minute, second int192 _, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)193 if err != nil {194 return nil, time.Time{}, err195 }196 line = line[11:]197 198 var cmd, sender, text string199 if strings.HasPrefix(line, "<") {200 cmd = "PRIVMSG"201 parts := strings.SplitN(line[1:], "> ", 2)202 if len(parts) != 2 {203 return nil, time.Time{}, nil204 }205 sender, text = parts[0], parts[1]206 } else if strings.HasPrefix(line, "-") {207 cmd = "NOTICE"208 parts := strings.SplitN(line[1:], "- ", 2)209 if len(parts) != 2 {210 return nil, time.Time{}, nil211 }212 sender, text = parts[0], parts[1]213 } else if strings.HasPrefix(line, "* ") {214 cmd = "PRIVMSG"215 parts := strings.SplitN(line[2:], " ", 2)216 if len(parts) != 2 {217 return nil, time.Time{}, nil218 }219 sender, text = parts[0], "\x01ACTION "+parts[1]+"\x01"220 } else {221 return nil, time.Time{}, nil222 }223 224 year, month, day := ref.Date()225 t := time.Date(year, month, day, hour, minute, second, 0, time.Local)226 227 msg := &irc.Message{228 Tags: map[string]irc.TagValue{229 "time": irc.TagValue(t.UTC().Format(serverTimeLayout)),230 },231 Prefix: &irc.Prefix{Name: sender},232 Command: cmd,233 Params: []string{entity, text},234 }235 return msg, t, nil236 }237 238 func (ms *messageStore) parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {239 path := ms.logPath(network, entity, ref)240 f, err := os.Open(path)241 if err != nil {242 if os.IsNotExist(err) {243 return nil, nil244 }245 return nil, err246 }247 defer f.Close()248 249 historyRing := make([]*irc.Message, limit)250 cur := 0251 252 sc := bufio.NewScanner(f)253 254 if afterOffset >= 0 {255 if _, err := f.Seek(afterOffset, io.SeekStart); err != nil {256 return nil, nil257 }258 sc.Scan() // skip till next newline259 }260 261 for sc.Scan() {262 msg, t, err := parseMessage(sc.Text(), entity, ref)263 if err != nil {264 return nil, err265 } else if msg == nil {266 continue267 } else if !t.Before(ref) {268 break269 }270 271 historyRing[cur%limit] = msg272 cur++273 }274 if sc.Err() != nil {275 return nil, sc.Err()276 }277 278 n := limit279 if cur < limit {280 n = cur281 }282 start := (cur - n + limit) % limit283 284 if start+n <= limit { // ring doesnt wrap285 return historyRing[start : start+n], nil286 } else { // ring wraps287 history := make([]*irc.Message, n)288 r := copy(history, historyRing[start:])289 copy(history[r:], historyRing[:n-r])290 return history, nil291 }292 }293 294 func (ms *messageStore) parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {295 path := ms.logPath(network, entity, ref)296 f, err := os.Open(path)297 if err != nil {298 if os.IsNotExist(err) {299 return nil, nil300 }301 return nil, err302 }303 defer f.Close()304 305 var history []*irc.Message306 sc := bufio.NewScanner(f)307 for sc.Scan() && len(history) < limit {308 msg, t, err := parseMessage(sc.Text(), entity, ref)309 if err != nil {310 return nil, err311 } else if msg == nil || !t.After(ref) {312 continue313 }314 315 history = append(history, msg)316 }317 if sc.Err() != nil {318 return nil, sc.Err()319 }320 321 return history, nil322 }323 324 func (ms *messageStore) LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {325 history := make([]*irc.Message, limit)326 remaining := limit327 tries := 0328 for remaining > 0 && tries < messageStoreMaxTries {329 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, -1)330 if err != nil {331 return nil, err332 }333 if len(buf) == 0 {334 tries++335 } else {336 tries = 0337 }338 copy(history[remaining-len(buf):], buf)339 remaining -= len(buf)340 year, month, day := t.Date()341 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)342 }343 344 return history[remaining:], nil345 }346 347 func (ms *messageStore) LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {348 var history []*irc.Message349 remaining := limit350 tries := 0351 now := time.Now()352 for remaining > 0 && tries < messageStoreMaxTries && t.Before(now) {353 buf, err := ms.parseMessagesAfter(network, entity, t, remaining)354 if err != nil {355 return nil, err356 }357 if len(buf) == 0 {358 tries++359 } else {360 tries = 0361 }362 history = append(history, buf...)363 remaining -= len(buf)364 year, month, day := t.Date()365 t = time.Date(year, month, day+1, 0, 0, 0, 0, t.Location())366 }367 return history, nil368 }369 370 func truncateDay(t time.Time) time.Time {371 year, month, day := t.Date()372 return time.Date(year, month, day, 0, 0, 0, 0, t.Location())373 }374 375 func (ms *messageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {376 var afterTime time.Time377 var afterOffset int64378 if id != "" {379 var idNet, idEntity string380 var err error381 idNet, idEntity, afterTime, afterOffset, err = parseMsgID(id)382 if err != nil {383 return nil, err384 }385 if idNet != network.GetName() || idEntity != entity {386 return nil, fmt.Errorf("cannot find message ID: message ID doesn't match network/entity")387 }388 }389 390 history := make([]*irc.Message, limit)391 t := time.Now()392 remaining := limit393 tries := 0394 for remaining > 0 && tries < messageStoreMaxTries && !truncateDay(t).Before(afterTime) {395 var offset int64 = -1396 if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {397 offset = afterOffset398 }399 400 buf, err := ms.parseMessagesBefore(network, entity, t, remaining, offset)401 if err != nil {402 return nil, err403 }404 if len(buf) == 0 {405 tries++406 } else {407 tries = 0408 }409 copy(history[remaining-len(buf):], buf)410 remaining -= len(buf)411 year, month, day := t.Date()412 t = time.Date(year, month, day, 0, 0, 0, 0, t.Location()).Add(-1)413 }414 415 return history[remaining:], nil416 } -
trunk/user.go
r437 r439 264 264 networks []*network 265 265 downstreamConns []*downstreamConn 266 msgStore *messageStore266 msgStore messageStore 267 267 268 268 // LIST commands in progress … … 277 277 278 278 func newUser(srv *Server, record *User) *user { 279 var msgStore *messageStore279 var msgStore messageStore 280 280 if srv.LogPath != "" { 281 msgStore = new MessageStore(srv.LogPath, record.Username)281 msgStore = newFSMessageStore(srv.LogPath, record.Username) 282 282 } 283 283
Note:
See TracChangeset
for help on using the changeset viewer.