Changeset 51 in code for trunk


Ignore:
Timestamp:
Feb 7, 2020, 4:35:57 PM (5 years ago)
Author:
contact
Message:

Add names to consumers

Location:
trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r50 r51  
    129129                }
    130130                u.lock.Unlock()
     131
     132                // TODO: figure out a better way to advance the ring buffer consumer cursor
     133                u.forEachUpstream(func(uc *upstreamConn) {
     134                        // TODO: let clients specify the ring buffer name in their username
     135                        uc.ring.Consumer("").Reset()
     136                })
    131137        }
    132138
     
    233239                }
    234240
    235                 consumer := uc.ring.Consumer()
     241                // TODO: let clients specify the ring buffer name in their username
     242                consumer := uc.ring.Consumer("")
    236243                for {
     244                        // TODO: these messages will get lost if the connection is closed
    237245                        msg := consumer.Consume()
    238246                        if msg == nil {
  • trunk/ring.go

    r50 r51  
    1111        cap, cur uint64
    1212
    13         consumers []RingConsumer
     13        consumers map[string]*RingConsumer
    1414}
    1515
    1616func NewRing(capacity int) *Ring {
    1717        return &Ring{
    18                 buffer: make([]*irc.Message, capacity),
    19                 cap:    uint64(capacity),
     18                buffer:    make([]*irc.Message, capacity),
     19                cap:       uint64(capacity),
     20                consumers: make(map[string]*RingConsumer),
    2021        }
    2122}
     
    2728}
    2829
    29 func (r *Ring) Consumer() *RingConsumer {
    30         return &RingConsumer{
     30func (r *Ring) Consumer(name string) *RingConsumer {
     31        consumer, ok := r.consumers[name]
     32        if ok {
     33                return consumer
     34        }
     35
     36        consumer = &RingConsumer{
    3137                ring: r,
    32                 cur:  0, // r.cur
     38                cur:  r.cur,
    3339        }
     40        r.consumers[name] = consumer
     41        return consumer
    3442}
    3543
     
    7078        return msg
    7179}
     80
     81func (rc *RingConsumer) Reset() {
     82        rc.cur = rc.ring.cur
     83}
Note: See TracChangeset for help on using the changeset viewer.