source: code/trunk/msgstore_memory.go@ 589

Last change on this file since 589 was 517, checked in by contact, 4 years ago

Check message stores implement expected interfaces

File size: 3.3 KB
Line 
1package soju
2
3import (
4 "fmt"
5 "time"
6
7 "git.sr.ht/~sircmpwn/go-bare"
8 "gopkg.in/irc.v3"
9)
10
11const messageRingBufferCap = 4096
12
13type memoryMsgID struct {
14 Seq bare.Uint
15}
16
17func (memoryMsgID) msgIDType() msgIDType {
18 return msgIDMemory
19}
20
21func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
22 var id memoryMsgID
23 netID, entity, err = parseMsgID(s, &id)
24 if err != nil {
25 return 0, "", 0, err
26 }
27 return netID, entity, uint64(id.Seq), nil
28}
29
30func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
31 id := memoryMsgID{bare.Uint(seq)}
32 return formatMsgID(netID, entity, &id)
33}
34
35type ringBufferKey struct {
36 networkID int64
37 entity string
38}
39
40type memoryMessageStore struct {
41 buffers map[ringBufferKey]*messageRingBuffer
42}
43
44var _ messageStore = (*memoryMessageStore)(nil)
45
46func newMemoryMessageStore() *memoryMessageStore {
47 return &memoryMessageStore{
48 buffers: make(map[ringBufferKey]*messageRingBuffer),
49 }
50}
51
52func (ms *memoryMessageStore) Close() error {
53 ms.buffers = nil
54 return nil
55}
56
57func (ms *memoryMessageStore) get(network *network, entity string) *messageRingBuffer {
58 k := ringBufferKey{networkID: network.ID, entity: entity}
59 if rb, ok := ms.buffers[k]; ok {
60 return rb
61 }
62 rb := newMessageRingBuffer(messageRingBufferCap)
63 ms.buffers[k] = rb
64 return rb
65}
66
67func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
68 var seq uint64
69 k := ringBufferKey{networkID: network.ID, entity: entity}
70 if rb, ok := ms.buffers[k]; ok {
71 seq = rb.cur
72 }
73 return formatMemoryMsgID(network.ID, entity, seq), nil
74}
75
76func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
77 k := ringBufferKey{networkID: network.ID, entity: entity}
78 rb, ok := ms.buffers[k]
79 if !ok {
80 rb = newMessageRingBuffer(messageRingBufferCap)
81 ms.buffers[k] = rb
82 }
83
84 seq := rb.Append(msg)
85 return formatMemoryMsgID(network.ID, entity, seq), nil
86}
87
88func (ms *memoryMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
89 _, _, seq, err := parseMemoryMsgID(id)
90 if err != nil {
91 return nil, err
92 }
93
94 k := ringBufferKey{networkID: network.ID, entity: entity}
95 rb, ok := ms.buffers[k]
96 if !ok {
97 return nil, nil
98 }
99
100 return rb.LoadLatestSeq(seq, limit)
101}
102
103type messageRingBuffer struct {
104 buf []*irc.Message
105 cur uint64
106}
107
108func newMessageRingBuffer(capacity int) *messageRingBuffer {
109 return &messageRingBuffer{
110 buf: make([]*irc.Message, capacity),
111 cur: 1,
112 }
113}
114
115func (rb *messageRingBuffer) cap() uint64 {
116 return uint64(len(rb.buf))
117}
118
119func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
120 seq := rb.cur
121 i := int(seq % rb.cap())
122 rb.buf[i] = msg
123 rb.cur++
124 return seq
125}
126
127func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
128 if seq > rb.cur {
129 return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
130 } else if seq == rb.cur {
131 return nil, nil
132 }
133
134 // The query excludes the message with the sequence number seq
135 diff := rb.cur - seq - 1
136 if diff > rb.cap() {
137 // We dropped diff - cap entries
138 diff = rb.cap()
139 }
140 if int(diff) > limit {
141 diff = uint64(limit)
142 }
143
144 l := make([]*irc.Message, int(diff))
145 for i := 0; i < int(diff); i++ {
146 j := int((rb.cur - diff + uint64(i)) % rb.cap())
147 l[i] = rb.buf[j]
148 }
149
150 return l, nil
151}
Note: See TracBrowser for help on using the repository browser.