source: code/trunk/ring.go@ 283

Last change on this file since 283 was 243, checked in by contact, 5 years ago

Remove Ring.consumers

We don't need to keep track of consumers anymore.

File size: 2.0 KB
RevLine 
[98]1package soju
[50]2
3import (
[138]4 "fmt"
[57]5
[50]6 "gopkg.in/irc.v3"
7)
8
9// Ring implements a single producer, multiple consumer ring buffer. The ring
10// buffer size is fixed. The ring buffer is stored in memory.
11type Ring struct {
[57]12 buffer []*irc.Message
13 cap uint64
[243]14 cur uint64
[50]15}
16
[59]17// NewRing creates a new ring buffer.
[50]18func NewRing(capacity int) *Ring {
19 return &Ring{
[57]20 buffer: make([]*irc.Message, capacity),
21 cap: uint64(capacity),
[50]22 }
23}
24
[59]25// Produce appends a new message to the ring buffer.
[50]26func (r *Ring) Produce(msg *irc.Message) {
27 i := int(r.cur % r.cap)
28 r.buffer[i] = msg
29 r.cur++
[57]30}
[51]31
[242]32// Cur returns the current history sequence number.
[231]33func (r *Ring) Cur() uint64 {
34 return r.cur
35}
36
[59]37// NewConsumer creates a new ring buffer consumer.
38//
[242]39// The consumer will get messages starting from the specified history sequence
40// number (see Ring.Cur).
41func (r *Ring) NewConsumer(seq uint64) *RingConsumer {
[243]42 return &RingConsumer{ring: r, cur: seq}
[50]43}
44
[59]45// RingConsumer is a ring buffer consumer.
[50]46type RingConsumer struct {
[232]47 ring *Ring
48 cur uint64
[50]49}
50
[57]51// diff returns the number of pending messages. It assumes the Ring is locked.
52func (rc *RingConsumer) diff() uint64 {
[50]53 if rc.cur > rc.ring.cur {
[138]54 panic(fmt.Sprintf("soju: consumer cursor (%v) greater than producer cursor (%v)", rc.cur, rc.ring.cur))
[50]55 }
56 return rc.ring.cur - rc.cur
57}
58
[59]59// Peek returns the next pending message if any without consuming it. A nil
60// message is returned if no message is available.
[50]61func (rc *RingConsumer) Peek() *irc.Message {
[57]62 diff := rc.diff()
[50]63 if diff == 0 {
64 return nil
65 }
66 if diff > rc.ring.cap {
67 // Consumer drops diff - cap entries
68 rc.cur = rc.ring.cur - rc.ring.cap
69 }
70 i := int(rc.cur % rc.ring.cap)
71 msg := rc.ring.buffer[i]
72 if msg == nil {
[138]73 panic(fmt.Sprintf("soju: unexpected nil ring buffer entry at index %v", i))
[50]74 }
75 return msg
76}
77
[59]78// Consume consumes and returns the next pending message. A nil message is
79// returned if no message is available.
[50]80func (rc *RingConsumer) Consume() *irc.Message {
81 msg := rc.Peek()
82 if msg != nil {
83 rc.cur++
84 }
85 return msg
86}
Note: See TracBrowser for help on using the repository browser.