Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r227 r228 695 695 } 696 696 697 consumer, _ := net.ring.NewConsumer(seqPtr) 698 699 if _, ok := dc.ringConsumers[net]; ok { 700 panic("network has been added twice") 701 } 697 consumer := net.ring.NewConsumer(seqPtr) 702 698 dc.ringConsumers[net] = consumer 703 699 -
trunk/ring.go
r203 r228 40 40 r.buffer[i] = msg 41 41 r.cur++ 42 43 for _, consumer := range r.consumers {44 select {45 case consumer.ch <- struct{}{}:46 // This space is intentionally left blank47 default:48 // The channel already has a pending item49 }50 }51 42 } 52 43 … … 57 48 if r.closed { 58 49 panic("soju: Ring.Close called twice") 59 }60 61 for _, rc := range r.consumers {62 close(rc.ch)63 50 } 64 51 … … 72 59 // from the specified history sequence number (see RingConsumer.Close). 73 60 // 74 // The returned channel yields a value each time the consumer has a new message75 // available. Consume should be called to drain the consumer.76 //77 61 // The consumer can only be used from a single goroutine. 78 func (r *Ring) NewConsumer(seq *uint64) (*RingConsumer, <-chan struct{}) { 79 consumer := &RingConsumer{ 80 ring: r, 81 ch: make(chan struct{}, 1), 82 } 62 func (r *Ring) NewConsumer(seq *uint64) *RingConsumer { 63 consumer := &RingConsumer{ring: r} 83 64 84 65 r.lock.Lock() … … 88 69 consumer.cur = r.cur 89 70 } 90 if consumer.diff() > 0 {91 consumer.ch <- struct{}{}92 }93 71 r.consumers = append(r.consumers, consumer) 94 72 r.lock.Unlock() 95 73 96 return consumer , consumer.ch74 return consumer 97 75 } 98 76 … … 101 79 ring *Ring 102 80 cur uint64 103 ch chan struct{}104 81 closed bool 105 82 } … … 162 139 rc.ring.lock.Unlock() 163 140 164 close(rc.ch)165 141 rc.closed = true 166 142 return rc.cur -
trunk/user.go
r227 r228 352 352 u.forEachDownstream(func(dc *downstreamConn) { 353 353 if dc.network == nil { 354 consumer, _ := network.ring.NewConsumer(nil) 355 dc.ringConsumers[network] = consumer 354 dc.ringConsumers[network] = network.ring.NewConsumer(nil) 356 355 } 357 356 })
Note:
See TracChangeset
for help on using the changeset viewer.