source: code/trunk/ring.go@ 58

Last change on this file since 58 was 57, checked in by contact, 5 years ago

Fix issues related to Ring

  • RingConsumer is now used directly in the goroutine responsible for writing downstream messages. This allows the ring buffer not to be consumed on write error.
  • RingConsumer now has a channel attached. This allows PRIVMSG messages to always use RingConsumer, instead of also directly pushing messages to all downstream connections.
  • Multiple clients with the same history name are now supported.
  • Ring is now protected by a mutex
File size: 2.3 KB
Line 
1package jounce
2
3import (
4 "sync"
5
6 "gopkg.in/irc.v3"
7)
8
9// Ring implements a single producer, multiple consumer ring buffer. The ring
10// buffer size is fixed. The ring buffer is stored in memory.
11type Ring struct {
12 buffer []*irc.Message
13 cap uint64
14
15 lock sync.Mutex
16 cur uint64
17 consumers []*RingConsumer
18}
19
20func NewRing(capacity int) *Ring {
21 return &Ring{
22 buffer: make([]*irc.Message, capacity),
23 cap: uint64(capacity),
24 }
25}
26
27func (r *Ring) Produce(msg *irc.Message) {
28 r.lock.Lock()
29 defer r.lock.Unlock()
30
31 i := int(r.cur % r.cap)
32 r.buffer[i] = msg
33 r.cur++
34
35 for _, consumer := range r.consumers {
36 select {
37 case consumer.ch <- struct{}{}:
38 // This space is intentionally left blank
39 default:
40 // The channel already has a pending item
41 }
42 }
43}
44
45func (r *Ring) Consumer(seq *uint64) (*RingConsumer, <-chan struct{}) {
46 consumer := &RingConsumer{
47 ring: r,
48 ch: make(chan struct{}, 1),
49 }
50
51 r.lock.Lock()
52 if seq != nil {
53 consumer.cur = *seq
54 } else {
55 consumer.cur = r.cur
56 }
57 if consumer.diff() > 0 {
58 consumer.ch <- struct{}{}
59 }
60 r.consumers = append(r.consumers, consumer)
61 r.lock.Unlock()
62
63 return consumer, consumer.ch
64}
65
66type RingConsumer struct {
67 ring *Ring
68 cur uint64
69 ch chan struct{}
70 closed bool
71}
72
73// diff returns the number of pending messages. It assumes the Ring is locked.
74func (rc *RingConsumer) diff() uint64 {
75 if rc.cur > rc.ring.cur {
76 panic("jounce: consumer cursor greater than producer cursor")
77 }
78 return rc.ring.cur - rc.cur
79}
80
81func (rc *RingConsumer) Peek() *irc.Message {
82 if rc.closed {
83 panic("jounce: RingConsumer.Peek called after Close")
84 }
85
86 rc.ring.lock.Lock()
87 defer rc.ring.lock.Unlock()
88
89 diff := rc.diff()
90 if diff == 0 {
91 return nil
92 }
93 if diff > rc.ring.cap {
94 // Consumer drops diff - cap entries
95 rc.cur = rc.ring.cur - rc.ring.cap
96 }
97 i := int(rc.cur % rc.ring.cap)
98 msg := rc.ring.buffer[i]
99 if msg == nil {
100 panic("jounce: unexpected nil ring buffer entry")
101 }
102 return msg
103}
104
105func (rc *RingConsumer) Consume() *irc.Message {
106 msg := rc.Peek()
107 if msg != nil {
108 rc.cur++
109 }
110 return msg
111}
112
113func (rc *RingConsumer) Close() uint64 {
114 rc.ring.lock.Lock()
115 for i := range rc.ring.consumers {
116 if rc.ring.consumers[i] == rc {
117 rc.ring.consumers = append(rc.ring.consumers[:i], rc.ring.consumers[i+1:]...)
118 break
119 }
120 }
121 rc.ring.lock.Unlock()
122
123 close(rc.ch)
124 rc.closed = true
125 return rc.cur
126}
Note: See TracBrowser for help on using the repository browser.