Changeset 242 in code
- Timestamp:
- Apr 7, 2020, 12:45:08 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r239 r242 662 662 663 663 dc.forEachNetwork(func(net *network) { 664 var seqPtr *uint64 665 if sendHistory { 666 seq, ok := net.history[dc.clientName] 667 if ok { 668 seqPtr = &seq 669 } 670 } 671 672 consumer := net.ring.NewConsumer(seqPtr) 664 seq, ok := net.history[dc.clientName] 665 if !sendHistory || !ok { 666 return 667 } 668 669 consumer := net.ring.NewConsumer(seq) 673 670 674 671 // TODO: this means all history is lost when trying to send it while the -
trunk/ring.go
r241 r242 32 32 } 33 33 34 // Cur returns the current history sequence number. 34 35 func (r *Ring) Cur() uint64 { 35 36 return r.cur … … 38 39 // NewConsumer creates a new ring buffer consumer. 39 40 // 40 // If seq is nil, the consumer will get messages starting from the last 41 // producer message. If seq is non-nil, the consumer will get messages starting 42 // from the specified history sequence number (see RingConsumer.Close). 43 // 44 // The consumer can only be used from a single goroutine. 45 func (r *Ring) NewConsumer(seq *uint64) *RingConsumer { 46 consumer := &RingConsumer{ring: r} 47 48 if seq != nil { 49 consumer.cur = *seq 50 } else { 51 consumer.cur = r.cur 52 } 41 // The consumer will get messages starting from the specified history sequence 42 // number (see Ring.Cur). 43 func (r *Ring) NewConsumer(seq uint64) *RingConsumer { 44 consumer := &RingConsumer{ring: r, cur: seq} 53 45 r.consumers = append(r.consumers, consumer) 54 55 46 return consumer 56 47 }
Note:
See TracChangeset
for help on using the changeset viewer.