Changeset 228 in code for trunk


Ignore:
Timestamp:
Apr 6, 2020, 4:13:46 PM (5 years ago)
Author:
contact
Message:

Remove channel from ring buffer consumers

This is unused.

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r227 r228  
    695695                }
    696696
    697                 consumer, _ := net.ring.NewConsumer(seqPtr)
    698 
    699                 if _, ok := dc.ringConsumers[net]; ok {
    700                         panic("network has been added twice")
    701                 }
     697                consumer := net.ring.NewConsumer(seqPtr)
    702698                dc.ringConsumers[net] = consumer
    703699
  • trunk/ring.go

    r203 r228  
    4040        r.buffer[i] = msg
    4141        r.cur++
    42 
    43         for _, consumer := range r.consumers {
    44                 select {
    45                 case consumer.ch <- struct{}{}:
    46                         // This space is intentionally left blank
    47                 default:
    48                         // The channel already has a pending item
    49                 }
    50         }
    5142}
    5243
     
    5748        if r.closed {
    5849                panic("soju: Ring.Close called twice")
    59         }
    60 
    61         for _, rc := range r.consumers {
    62                 close(rc.ch)
    6350        }
    6451
     
    7259// from the specified history sequence number (see RingConsumer.Close).
    7360//
    74 // The returned channel yields a value each time the consumer has a new message
    75 // available. Consume should be called to drain the consumer.
    76 //
    7761// The consumer can only be used from a single goroutine.
    78 func (r *Ring) NewConsumer(seq *uint64) (*RingConsumer, <-chan struct{}) {
    79         consumer := &RingConsumer{
    80                 ring: r,
    81                 ch:   make(chan struct{}, 1),
    82         }
     62func (r *Ring) NewConsumer(seq *uint64) *RingConsumer {
     63        consumer := &RingConsumer{ring: r}
    8364
    8465        r.lock.Lock()
     
    8869                consumer.cur = r.cur
    8970        }
    90         if consumer.diff() > 0 {
    91                 consumer.ch <- struct{}{}
    92         }
    9371        r.consumers = append(r.consumers, consumer)
    9472        r.lock.Unlock()
    9573
    96         return consumer, consumer.ch
     74        return consumer
    9775}
    9876
     
    10179        ring   *Ring
    10280        cur    uint64
    103         ch     chan struct{}
    10481        closed bool
    10582}
     
    162139        rc.ring.lock.Unlock()
    163140
    164         close(rc.ch)
    165141        rc.closed = true
    166142        return rc.cur
  • trunk/user.go

    r227 r228  
    352352        u.forEachDownstream(func(dc *downstreamConn) {
    353353                if dc.network == nil {
    354                         consumer, _ := network.ring.NewConsumer(nil)
    355                         dc.ringConsumers[network] = consumer
     354                        dc.ringConsumers[network] = network.ring.NewConsumer(nil)
    356355                }
    357356        })
Note: See TracChangeset for help on using the changeset viewer.