Changeset 51 in code
- Timestamp:
- Feb 7, 2020, 4:35:57 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r50 r51 129 129 } 130 130 u.lock.Unlock() 131 132 // TODO: figure out a better way to advance the ring buffer consumer cursor 133 u.forEachUpstream(func(uc *upstreamConn) { 134 // TODO: let clients specify the ring buffer name in their username 135 uc.ring.Consumer("").Reset() 136 }) 131 137 } 132 138 … … 233 239 } 234 240 235 consumer := uc.ring.Consumer() 241 // TODO: let clients specify the ring buffer name in their username 242 consumer := uc.ring.Consumer("") 236 243 for { 244 // TODO: these messages will get lost if the connection is closed 237 245 msg := consumer.Consume() 238 246 if msg == nil { -
trunk/ring.go
r50 r51 11 11 cap, cur uint64 12 12 13 consumers []RingConsumer13 consumers map[string]*RingConsumer 14 14 } 15 15 16 16 func NewRing(capacity int) *Ring { 17 17 return &Ring{ 18 buffer: make([]*irc.Message, capacity), 19 cap: uint64(capacity), 18 buffer: make([]*irc.Message, capacity), 19 cap: uint64(capacity), 20 consumers: make(map[string]*RingConsumer), 20 21 } 21 22 } … … 27 28 } 28 29 29 func (r *Ring) Consumer() *RingConsumer { 30 return &RingConsumer{ 30 func (r *Ring) Consumer(name string) *RingConsumer { 31 consumer, ok := r.consumers[name] 32 if ok { 33 return consumer 34 } 35 36 consumer = &RingConsumer{ 31 37 ring: r, 32 cur: 0, // r.cur38 cur: r.cur, 33 39 } 40 r.consumers[name] = consumer 41 return consumer 34 42 } 35 43 … … 70 78 return msg 71 79 } 80 81 func (rc *RingConsumer) Reset() { 82 rc.cur = rc.ring.cur 83 }
Note:
See TracChangeset
for help on using the changeset viewer.