source: code/trunk/ring.go@ 309

Last change on this file since 309 was 243, checked in by contact, 5 years ago

Remove Ring.consumers

We don't need to keep track of consumers anymore.

File size: 2.0 KB
Line 
1package soju
2
3import (
4 "fmt"
5
6 "gopkg.in/irc.v3"
7)
8
9// Ring implements a single producer, multiple consumer ring buffer. The ring
10// buffer size is fixed. The ring buffer is stored in memory.
11type Ring struct {
12 buffer []*irc.Message
13 cap uint64
14 cur uint64
15}
16
17// NewRing creates a new ring buffer.
18func NewRing(capacity int) *Ring {
19 return &Ring{
20 buffer: make([]*irc.Message, capacity),
21 cap: uint64(capacity),
22 }
23}
24
25// Produce appends a new message to the ring buffer.
26func (r *Ring) Produce(msg *irc.Message) {
27 i := int(r.cur % r.cap)
28 r.buffer[i] = msg
29 r.cur++
30}
31
32// Cur returns the current history sequence number.
33func (r *Ring) Cur() uint64 {
34 return r.cur
35}
36
37// NewConsumer creates a new ring buffer consumer.
38//
39// The consumer will get messages starting from the specified history sequence
40// number (see Ring.Cur).
41func (r *Ring) NewConsumer(seq uint64) *RingConsumer {
42 return &RingConsumer{ring: r, cur: seq}
43}
44
45// RingConsumer is a ring buffer consumer.
46type RingConsumer struct {
47 ring *Ring
48 cur uint64
49}
50
51// diff returns the number of pending messages. It assumes the Ring is locked.
52func (rc *RingConsumer) diff() uint64 {
53 if rc.cur > rc.ring.cur {
54 panic(fmt.Sprintf("soju: consumer cursor (%v) greater than producer cursor (%v)", rc.cur, rc.ring.cur))
55 }
56 return rc.ring.cur - rc.cur
57}
58
59// Peek returns the next pending message if any without consuming it. A nil
60// message is returned if no message is available.
61func (rc *RingConsumer) Peek() *irc.Message {
62 diff := rc.diff()
63 if diff == 0 {
64 return nil
65 }
66 if diff > rc.ring.cap {
67 // Consumer drops diff - cap entries
68 rc.cur = rc.ring.cur - rc.ring.cap
69 }
70 i := int(rc.cur % rc.ring.cap)
71 msg := rc.ring.buffer[i]
72 if msg == nil {
73 panic(fmt.Sprintf("soju: unexpected nil ring buffer entry at index %v", i))
74 }
75 return msg
76}
77
78// Consume consumes and returns the next pending message. A nil message is
79// returned if no message is available.
80func (rc *RingConsumer) Consume() *irc.Message {
81 msg := rc.Peek()
82 if msg != nil {
83 rc.cur++
84 }
85 return msg
86}
Note: See TracBrowser for help on using the repository browser.