source: code/trunk/msgstore_memory.go@ 781

Last change on this file since 781 was 669, checked in by contact, 4 years ago

msgstore_memory: add comment about Append dropping messages

File size: 3.5 KB
Line 
1package soju
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "git.sr.ht/~sircmpwn/go-bare"
9 "gopkg.in/irc.v3"
10)
11
12const messageRingBufferCap = 4096
13
14type memoryMsgID struct {
15 Seq bare.Uint
16}
17
18func (memoryMsgID) msgIDType() msgIDType {
19 return msgIDMemory
20}
21
22func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
23 var id memoryMsgID
24 netID, entity, err = parseMsgID(s, &id)
25 if err != nil {
26 return 0, "", 0, err
27 }
28 return netID, entity, uint64(id.Seq), nil
29}
30
31func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
32 id := memoryMsgID{bare.Uint(seq)}
33 return formatMsgID(netID, entity, &id)
34}
35
36type ringBufferKey struct {
37 networkID int64
38 entity string
39}
40
41type memoryMessageStore struct {
42 buffers map[ringBufferKey]*messageRingBuffer
43}
44
45var _ messageStore = (*memoryMessageStore)(nil)
46
47func newMemoryMessageStore() *memoryMessageStore {
48 return &memoryMessageStore{
49 buffers: make(map[ringBufferKey]*messageRingBuffer),
50 }
51}
52
53func (ms *memoryMessageStore) Close() error {
54 ms.buffers = nil
55 return nil
56}
57
58func (ms *memoryMessageStore) get(network *Network, entity string) *messageRingBuffer {
59 k := ringBufferKey{networkID: network.ID, entity: entity}
60 if rb, ok := ms.buffers[k]; ok {
61 return rb
62 }
63 rb := newMessageRingBuffer(messageRingBufferCap)
64 ms.buffers[k] = rb
65 return rb
66}
67
68func (ms *memoryMessageStore) LastMsgID(network *Network, entity string, t time.Time) (string, error) {
69 var seq uint64
70 k := ringBufferKey{networkID: network.ID, entity: entity}
71 if rb, ok := ms.buffers[k]; ok {
72 seq = rb.cur
73 }
74 return formatMemoryMsgID(network.ID, entity, seq), nil
75}
76
77func (ms *memoryMessageStore) Append(network *Network, entity string, msg *irc.Message) (string, error) {
78 switch msg.Command {
79 case "PRIVMSG", "NOTICE":
80 // Only append these messages, because LoadLatestID shouldn't return
81 // other kinds of message.
82 default:
83 return "", nil
84 }
85
86 k := ringBufferKey{networkID: network.ID, entity: entity}
87 rb, ok := ms.buffers[k]
88 if !ok {
89 rb = newMessageRingBuffer(messageRingBufferCap)
90 ms.buffers[k] = rb
91 }
92
93 seq := rb.Append(msg)
94 return formatMemoryMsgID(network.ID, entity, seq), nil
95}
96
97func (ms *memoryMessageStore) LoadLatestID(ctx context.Context, network *Network, entity, id string, limit int) ([]*irc.Message, error) {
98 _, _, seq, err := parseMemoryMsgID(id)
99 if err != nil {
100 return nil, err
101 }
102
103 k := ringBufferKey{networkID: network.ID, entity: entity}
104 rb, ok := ms.buffers[k]
105 if !ok {
106 return nil, nil
107 }
108
109 return rb.LoadLatestSeq(seq, limit)
110}
111
112type messageRingBuffer struct {
113 buf []*irc.Message
114 cur uint64
115}
116
117func newMessageRingBuffer(capacity int) *messageRingBuffer {
118 return &messageRingBuffer{
119 buf: make([]*irc.Message, capacity),
120 cur: 1,
121 }
122}
123
124func (rb *messageRingBuffer) cap() uint64 {
125 return uint64(len(rb.buf))
126}
127
128func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
129 seq := rb.cur
130 i := int(seq % rb.cap())
131 rb.buf[i] = msg
132 rb.cur++
133 return seq
134}
135
136func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
137 if seq > rb.cur {
138 return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
139 } else if seq == rb.cur {
140 return nil, nil
141 }
142
143 // The query excludes the message with the sequence number seq
144 diff := rb.cur - seq - 1
145 if diff > rb.cap() {
146 // We dropped diff - cap entries
147 diff = rb.cap()
148 }
149 if int(diff) > limit {
150 diff = uint64(limit)
151 }
152
153 l := make([]*irc.Message, int(diff))
154 for i := 0; i < int(diff); i++ {
155 j := int((rb.cur - diff + uint64(i)) % rb.cap())
156 l[i] = rb.buf[j]
157 }
158
159 return l, nil
160}
Note: See TracBrowser for help on using the repository browser.