Changeset 228 in code for trunk/ring.go
- Timestamp:
- Apr 6, 2020, 4:13:46 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/ring.go
r203 r228 40 40 r.buffer[i] = msg 41 41 r.cur++ 42 43 for _, consumer := range r.consumers {44 select {45 case consumer.ch <- struct{}{}:46 // This space is intentionally left blank47 default:48 // The channel already has a pending item49 }50 }51 42 } 52 43 … … 57 48 if r.closed { 58 49 panic("soju: Ring.Close called twice") 59 }60 61 for _, rc := range r.consumers {62 close(rc.ch)63 50 } 64 51 … … 72 59 // from the specified history sequence number (see RingConsumer.Close). 73 60 // 74 // The returned channel yields a value each time the consumer has a new message75 // available. Consume should be called to drain the consumer.76 //77 61 // The consumer can only be used from a single goroutine. 78 func (r *Ring) NewConsumer(seq *uint64) (*RingConsumer, <-chan struct{}) { 79 consumer := &RingConsumer{ 80 ring: r, 81 ch: make(chan struct{}, 1), 82 } 62 func (r *Ring) NewConsumer(seq *uint64) *RingConsumer { 63 consumer := &RingConsumer{ring: r} 83 64 84 65 r.lock.Lock() … … 88 69 consumer.cur = r.cur 89 70 } 90 if consumer.diff() > 0 {91 consumer.ch <- struct{}{}92 }93 71 r.consumers = append(r.consumers, consumer) 94 72 r.lock.Unlock() 95 73 96 return consumer , consumer.ch74 return consumer 97 75 } 98 76 … … 101 79 ring *Ring 102 80 cur uint64 103 ch chan struct{}104 81 closed bool 105 82 } … … 162 139 rc.ring.lock.Unlock() 163 140 164 close(rc.ch)165 141 rc.closed = true 166 142 return rc.cur
Note:
See TracChangeset
for help on using the changeset viewer.