Changeset 57 in code for trunk/ring.go
- Timestamp:
- Feb 17, 2020, 2:46:29 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/ring.go
r51 r57 2 2 3 3 import ( 4 "sync" 5 4 6 "gopkg.in/irc.v3" 5 7 ) … … 8 10 // buffer size is fixed. The ring buffer is stored in memory. 9 11 type Ring struct { 10 buffer 11 cap , curuint6412 buffer []*irc.Message 13 cap uint64 12 14 13 consumers map[string]*RingConsumer 15 lock sync.Mutex 16 cur uint64 17 consumers []*RingConsumer 14 18 } 15 19 16 20 func NewRing(capacity int) *Ring { 17 21 return &Ring{ 18 buffer: make([]*irc.Message, capacity), 19 cap: uint64(capacity), 20 consumers: make(map[string]*RingConsumer), 22 buffer: make([]*irc.Message, capacity), 23 cap: uint64(capacity), 21 24 } 22 25 } 23 26 24 27 func (r *Ring) Produce(msg *irc.Message) { 28 r.lock.Lock() 29 defer r.lock.Unlock() 30 25 31 i := int(r.cur % r.cap) 26 32 r.buffer[i] = msg 27 33 r.cur++ 34 35 for _, consumer := range r.consumers { 36 select { 37 case consumer.ch <- struct{}{}: 38 // This space is intentionally left blank 39 default: 40 // The channel already has a pending item 41 } 42 } 28 43 } 29 44 30 func (r *Ring) Consumer( name string) *RingConsumer{31 consumer , ok := r.consumers[name]32 if ok {33 return consumer45 func (r *Ring) Consumer(seq *uint64) (*RingConsumer, <-chan struct{}) { 46 consumer := &RingConsumer{ 47 ring: r, 48 ch: make(chan struct{}, 1), 34 49 } 35 50 36 consumer = &RingConsumer{ 37 ring: r, 38 cur: r.cur, 51 r.lock.Lock() 52 if seq != nil { 53 consumer.cur = *seq 54 } else { 55 consumer.cur = r.cur 39 56 } 40 r.consumers[name] = consumer 41 return consumer 57 if consumer.diff() > 0 { 58 consumer.ch <- struct{}{} 59 } 60 r.consumers = append(r.consumers, consumer) 61 r.lock.Unlock() 62 63 return consumer, consumer.ch 42 64 } 43 65 44 66 type RingConsumer struct { 45 ring *Ring 46 cur uint64 67 ring *Ring 68 cur uint64 69 ch chan struct{} 70 closed bool 47 71 } 48 72 49 func (rc *RingConsumer) Diff() uint64 { 73 // diff returns the number of pending messages. It assumes the Ring is locked. 74 func (rc *RingConsumer) diff() uint64 { 50 75 if rc.cur > rc.ring.cur { 51 76 panic("jounce: consumer cursor greater than producer cursor") … … 55 80 56 81 func (rc *RingConsumer) Peek() *irc.Message { 57 diff := rc.Diff() 82 if rc.closed { 83 panic("jounce: RingConsumer.Peek called after Close") 84 } 85 86 rc.ring.lock.Lock() 87 defer rc.ring.lock.Unlock() 88 89 diff := rc.diff() 58 90 if diff == 0 { 59 91 return nil … … 79 111 } 80 112 81 func (rc *RingConsumer) Reset() { 82 rc.cur = rc.ring.cur 113 func (rc *RingConsumer) Close() uint64 { 114 rc.ring.lock.Lock() 115 for i := range rc.ring.consumers { 116 if rc.ring.consumers[i] == rc { 117 rc.ring.consumers = append(rc.ring.consumers[:i], rc.ring.consumers[i+1:]...) 118 break 119 } 120 } 121 rc.ring.lock.Unlock() 122 123 close(rc.ch) 124 rc.closed = true 125 return rc.cur 83 126 }
Note:
See TracChangeset
for help on using the changeset viewer.