source: code/trunk/msgstore_memory.go@ 665

Last change on this file since 665 was 665, checked in by delthas, 4 years ago

Add support for draft/event-playback

File size: 3.4 KB
Line 
1package soju
2
3import (
4 "fmt"
5 "time"
6
7 "git.sr.ht/~sircmpwn/go-bare"
8 "gopkg.in/irc.v3"
9)
10
11const messageRingBufferCap = 4096
12
13type memoryMsgID struct {
14 Seq bare.Uint
15}
16
17func (memoryMsgID) msgIDType() msgIDType {
18 return msgIDMemory
19}
20
21func parseMemoryMsgID(s string) (netID int64, entity string, seq uint64, err error) {
22 var id memoryMsgID
23 netID, entity, err = parseMsgID(s, &id)
24 if err != nil {
25 return 0, "", 0, err
26 }
27 return netID, entity, uint64(id.Seq), nil
28}
29
30func formatMemoryMsgID(netID int64, entity string, seq uint64) string {
31 id := memoryMsgID{bare.Uint(seq)}
32 return formatMsgID(netID, entity, &id)
33}
34
35type ringBufferKey struct {
36 networkID int64
37 entity string
38}
39
40type memoryMessageStore struct {
41 buffers map[ringBufferKey]*messageRingBuffer
42}
43
44var _ messageStore = (*memoryMessageStore)(nil)
45
46func newMemoryMessageStore() *memoryMessageStore {
47 return &memoryMessageStore{
48 buffers: make(map[ringBufferKey]*messageRingBuffer),
49 }
50}
51
52func (ms *memoryMessageStore) Close() error {
53 ms.buffers = nil
54 return nil
55}
56
57func (ms *memoryMessageStore) get(network *network, entity string) *messageRingBuffer {
58 k := ringBufferKey{networkID: network.ID, entity: entity}
59 if rb, ok := ms.buffers[k]; ok {
60 return rb
61 }
62 rb := newMessageRingBuffer(messageRingBufferCap)
63 ms.buffers[k] = rb
64 return rb
65}
66
67func (ms *memoryMessageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
68 var seq uint64
69 k := ringBufferKey{networkID: network.ID, entity: entity}
70 if rb, ok := ms.buffers[k]; ok {
71 seq = rb.cur
72 }
73 return formatMemoryMsgID(network.ID, entity, seq), nil
74}
75
76func (ms *memoryMessageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
77 switch msg.Command {
78 case "PRIVMSG", "NOTICE":
79 default:
80 return "", nil
81 }
82
83 k := ringBufferKey{networkID: network.ID, entity: entity}
84 rb, ok := ms.buffers[k]
85 if !ok {
86 rb = newMessageRingBuffer(messageRingBufferCap)
87 ms.buffers[k] = rb
88 }
89
90 seq := rb.Append(msg)
91 return formatMemoryMsgID(network.ID, entity, seq), nil
92}
93
94func (ms *memoryMessageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
95 _, _, seq, err := parseMemoryMsgID(id)
96 if err != nil {
97 return nil, err
98 }
99
100 k := ringBufferKey{networkID: network.ID, entity: entity}
101 rb, ok := ms.buffers[k]
102 if !ok {
103 return nil, nil
104 }
105
106 return rb.LoadLatestSeq(seq, limit)
107}
108
109type messageRingBuffer struct {
110 buf []*irc.Message
111 cur uint64
112}
113
114func newMessageRingBuffer(capacity int) *messageRingBuffer {
115 return &messageRingBuffer{
116 buf: make([]*irc.Message, capacity),
117 cur: 1,
118 }
119}
120
121func (rb *messageRingBuffer) cap() uint64 {
122 return uint64(len(rb.buf))
123}
124
125func (rb *messageRingBuffer) Append(msg *irc.Message) uint64 {
126 seq := rb.cur
127 i := int(seq % rb.cap())
128 rb.buf[i] = msg
129 rb.cur++
130 return seq
131}
132
133func (rb *messageRingBuffer) LoadLatestSeq(seq uint64, limit int) ([]*irc.Message, error) {
134 if seq > rb.cur {
135 return nil, fmt.Errorf("loading messages from sequence number (%v) greater than current (%v)", seq, rb.cur)
136 } else if seq == rb.cur {
137 return nil, nil
138 }
139
140 // The query excludes the message with the sequence number seq
141 diff := rb.cur - seq - 1
142 if diff > rb.cap() {
143 // We dropped diff - cap entries
144 diff = rb.cap()
145 }
146 if int(diff) > limit {
147 diff = uint64(limit)
148 }
149
150 l := make([]*irc.Message, int(diff))
151 for i := 0; i < int(diff); i++ {
152 j := int((rb.cur - diff + uint64(i)) % rb.cap())
153 l[i] = rb.buf[j]
154 }
155
156 return l, nil
157}
Note: See TracBrowser for help on using the repository browser.