source: code/trunk/msgstore_memory.go@ 448

Last change on this file since 448 was 444, checked in by contact, 4 years ago

go fmt

File size: 3.3 KB
RevLine 
[442]1package soju
2
3import (
4 "fmt"
5 "strconv"
6 "time"
7
8 "gopkg.in/irc.v3"
9)
10
11const messageRingBufferCap = 4096
12
13func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
14 netID, entity, extra, err := parseMsgID(s)
15 if err != nil {
16 return 0, "", 0, err
17 }
18 seq, err = strconv.ParseUint(extra, 10, 64)
19 if err != nil {
20 return 0, "", 0, fmt.Errorf("failed to parse message ID %q: %v", s, err)
21 }
22 return netID, entity, seq, nil
23}
24
25func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
26 extra := strconv.FormatUint(seq, 10)
27 return formatMsgID(netID, entity, extra)
28}
29
30type ringBufferKey struct {
31 networkID int64
32 entity string
33}
34
35type memoryMessageStore struct {
36 buffers map[ringBufferKey]*messageRingBuffer
37}
38
39func newMemoryMessageStore() *memoryMessageStore {
40 return &memoryMessageStore{
41 buffers: make(map[ringBufferKey]*messageRingBuffer),
42 }
43}
44
45func (ms *memoryMessageStore) Close() error {
46 ms.buffers = nil
47 return nil
48}
49
50func (ms *memoryMessageStore) get(network *network, entity string) *messageRingBuffer {
51 k := ringBufferKey{networkID: network.ID, entity: entity}
52 if rb, ok := ms.buffers[k]; ok {
53 return rb
54 }
55 rb := newMessageRingBuffer(messageRingBufferCap)
56 ms.buffers[k] = rb
57 return rb
58}
59
60func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
61 var seq uint64
62 k := ringBufferKey{networkID: network.ID, entity: entity}
63 if rb, ok := ms.buffers[k]; ok {
64 seq = rb.cur
65 }
66 return formatMemoryMsgID(network.ID, entity, seq), nil
67}
68
69func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
70 k := ringBufferKey{networkID: network.ID, entity: entity}
71 rb, ok := ms.buffers[k]
72 if !ok {
73 rb = newMessageRingBuffer(messageRingBufferCap)
74 ms.buffers[k] = rb
75 }
76
77 seq := rb.Append(msg)
78 return formatMemoryMsgID(network.ID, entity, seq), nil
79}
80
81func (ms *memoryMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
82 _, _, seq, err := parseMemoryMsgID(id)
83 if err != nil {
84 return nil, err
85 }
86
87 k := ringBufferKey{networkID: network.ID, entity: entity}
88 rb, ok := ms.buffers[k]
89 if !ok {
90 return nil, nil
91 }
92
93 return rb.LoadLatestSeq(seq, limit)
94}
95
96type messageRingBuffer struct {
97 buf []*irc.Message
[444]98 cur uint64
[442]99}
100
101func newMessageRingBuffer(capacity int) *messageRingBuffer {
102 return &messageRingBuffer{
103 buf: make([]*irc.Message, capacity),
104 cur: 1,
105 }
106}
107
108func (rb *messageRingBuffer) cap() uint64 {
109 return uint64(len(rb.buf))
110}
111
112func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
113 seq := rb.cur
114 i := int(seq % rb.cap())
115 rb.buf[i] = msg
116 rb.cur++
117 return seq
118}
119
120func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
121 if seq > rb.cur {
122 return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
123 } else if seq == rb.cur {
124 return nil, nil
125 }
126
127 // The query excludes the message with the sequence number seq
128 diff := rb.cur - seq - 1
129 if diff > rb.cap() {
130 // We dropped diff - cap entries
131 diff = rb.cap()
132 }
133 if int(diff) > limit {
134 diff = uint64(limit)
135 }
136
137 l := make([]*irc.Message, int(diff))
138 for i := 0; i < int(diff); i++ {
139 j := int((rb.cur - diff + uint64(i)) % rb.cap())
140 l[i] = rb.buf[j]
141 }
142
143 return l, nil
144}
Note: See TracBrowser for help on using the repository browser.