1 | package suika
|
---|
2 |
|
---|
3 | import (
|
---|
4 | "context"
|
---|
5 | "fmt"
|
---|
6 | "time"
|
---|
7 |
|
---|
8 | "git.sr.ht/~sircmpwn/go-bare"
|
---|
9 | "gopkg.in/irc.v3"
|
---|
10 | )
|
---|
11 |
|
---|
12 | const messageRingBufferCap = 4096
|
---|
13 |
|
---|
14 | type memoryMsgID struct {
|
---|
15 | Seq bare.Uint
|
---|
16 | }
|
---|
17 |
|
---|
18 | func (memoryMsgID) msgIDType() msgIDType {
|
---|
19 | return msgIDMemory
|
---|
20 | }
|
---|
21 |
|
---|
22 | func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
|
---|
23 | var id memoryMsgID
|
---|
24 | netID, entity, err = parseMsgID(s, &id)
|
---|
25 | if err != nil {
|
---|
26 | return 0, "", 0, err
|
---|
27 | }
|
---|
28 | return netID, entity, uint64(id.Seq), nil
|
---|
29 | }
|
---|
30 |
|
---|
31 | func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
|
---|
32 | id := memoryMsgID{bare.Uint(seq)}
|
---|
33 | return formatMsgID(netID, entity, &id)
|
---|
34 | }
|
---|
35 |
|
---|
36 | type ringBufferKey struct {
|
---|
37 | networkID int64
|
---|
38 | entity string
|
---|
39 | }
|
---|
40 |
|
---|
41 | type memoryMessageStore struct {
|
---|
42 | buffers map[ringBufferKey]*messageRingBuffer
|
---|
43 | }
|
---|
44 |
|
---|
45 | var _ messageStore = (*memoryMessageStore)(nil)
|
---|
46 |
|
---|
47 | func newMemoryMessageStore() *memoryMessageStore {
|
---|
48 | return &memoryMessageStore{
|
---|
49 | buffers: make(map[ringBufferKey]*messageRingBuffer),
|
---|
50 | }
|
---|
51 | }
|
---|
52 |
|
---|
53 | func (ms *memoryMessageStore) Close() error {
|
---|
54 | ms.buffers = nil
|
---|
55 | return nil
|
---|
56 | }
|
---|
57 |
|
---|
58 | func (ms *memoryMessageStore) get(network *Network, entity string) *messageRingBuffer {
|
---|
59 | k := ringBufferKey{networkID: network.ID, entity: entity}
|
---|
60 | if rb, ok := ms.buffers[k]; ok {
|
---|
61 | return rb
|
---|
62 | }
|
---|
63 | rb := newMessageRingBuffer(messageRingBufferCap)
|
---|
64 | ms.buffers[k] = rb
|
---|
65 | return rb
|
---|
66 | }
|
---|
67 |
|
---|
68 | func (ms *memoryMessageStore) LastMsgID(network *Network, entity string, t time.Time) (string, error) {
|
---|
69 | var seq uint64
|
---|
70 | k := ringBufferKey{networkID: network.ID, entity: entity}
|
---|
71 | if rb, ok := ms.buffers[k]; ok {
|
---|
72 | seq = rb.cur
|
---|
73 | }
|
---|
74 | return formatMemoryMsgID(network.ID, entity, seq), nil
|
---|
75 | }
|
---|
76 |
|
---|
77 | func (ms *memoryMessageStore) Append(network *Network, entity string, msg *irc.Message) (string, error) {
|
---|
78 | switch msg.Command {
|
---|
79 | case "PRIVMSG", "NOTICE":
|
---|
80 | // Only append these messages, because LoadLatestID shouldn't return
|
---|
81 | // other kinds of message.
|
---|
82 | default:
|
---|
83 | return "", nil
|
---|
84 | }
|
---|
85 |
|
---|
86 | k := ringBufferKey{networkID: network.ID, entity: entity}
|
---|
87 | rb, ok := ms.buffers[k]
|
---|
88 | if !ok {
|
---|
89 | rb = newMessageRingBuffer(messageRingBufferCap)
|
---|
90 | ms.buffers[k] = rb
|
---|
91 | }
|
---|
92 |
|
---|
93 | seq := rb.Append(msg)
|
---|
94 | return formatMemoryMsgID(network.ID, entity, seq), nil
|
---|
95 | }
|
---|
96 |
|
---|
97 | func (ms *memoryMessageStore) LoadLatestID(ctx context.Context, network *Network, entity, id string, limit int) ([]*irc.Message, error) {
|
---|
98 | _, _, seq, err := parseMemoryMsgID(id)
|
---|
99 | if err != nil {
|
---|
100 | return nil, err
|
---|
101 | }
|
---|
102 |
|
---|
103 | k := ringBufferKey{networkID: network.ID, entity: entity}
|
---|
104 | rb, ok := ms.buffers[k]
|
---|
105 | if !ok {
|
---|
106 | return nil, nil
|
---|
107 | }
|
---|
108 |
|
---|
109 | return rb.LoadLatestSeq(seq, limit)
|
---|
110 | }
|
---|
111 |
|
---|
112 | type messageRingBuffer struct {
|
---|
113 | buf []*irc.Message
|
---|
114 | cur uint64
|
---|
115 | }
|
---|
116 |
|
---|
117 | func newMessageRingBuffer(capacity int) *messageRingBuffer {
|
---|
118 | return &messageRingBuffer{
|
---|
119 | buf: make([]*irc.Message, capacity),
|
---|
120 | cur: 1,
|
---|
121 | }
|
---|
122 | }
|
---|
123 |
|
---|
124 | func (rb *messageRingBuffer) cap() uint64 {
|
---|
125 | return uint64(len(rb.buf))
|
---|
126 | }
|
---|
127 |
|
---|
128 | func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
|
---|
129 | seq := rb.cur
|
---|
130 | i := int(seq % rb.cap())
|
---|
131 | rb.buf[i] = msg
|
---|
132 | rb.cur++
|
---|
133 | return seq
|
---|
134 | }
|
---|
135 |
|
---|
136 | func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
|
---|
137 | if seq > rb.cur {
|
---|
138 | return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
|
---|
139 | } else if seq == rb.cur {
|
---|
140 | return nil, nil
|
---|
141 | }
|
---|
142 |
|
---|
143 | // The query excludes the message with the sequence number seq
|
---|
144 | diff := rb.cur - seq - 1
|
---|
145 | if diff > rb.cap() {
|
---|
146 | // We dropped diff - cap entries
|
---|
147 | diff = rb.cap()
|
---|
148 | }
|
---|
149 | if int(diff) > limit {
|
---|
150 | diff = uint64(limit)
|
---|
151 | }
|
---|
152 |
|
---|
153 | l := make([]*irc.Message, int(diff))
|
---|
154 | for i := 0; i < int(diff); i++ {
|
---|
155 | j := int((rb.cur - diff + uint64(i)) % rb.cap())
|
---|
156 | l[i] = rb.buf[j]
|
---|
157 | }
|
---|
158 |
|
---|
159 | return l, nil
|
---|
160 | }
|
---|