Changeset 143 in code for trunk/downstream.go


Ignore:
Timestamp:
Mar 25, 2020, 9:53:08 AM (5 years ago)
Author:
contact
Message:

Move upstreamConn.ring to network

This handles upstream disconnection and re-connection better.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r142 r143  
    700700                        }
    701701                }
    702 
     702        })
     703
     704        dc.forEachNetwork(func(net *network) {
    703705                historyName := dc.rawUsername
    704706
     
    706708                var seqPtr *uint64
    707709                if firstDownstream {
    708                         uc.network.lock.Lock()
    709                         seq, ok := uc.network.history[historyName]
    710                         uc.network.lock.Unlock()
     710                        net.lock.Lock()
     711                        seq, ok := net.history[historyName]
     712                        net.lock.Unlock()
    711713                        if ok {
    712714                                seqPtr = &seq
     
    715717
    716718                // TODO: we need to create a consumer when adding networks on-the-fly
    717                 consumer, ch := uc.ring.NewConsumer(seqPtr)
     719                consumer, ch := net.ring.NewConsumer(seqPtr)
    718720                go func() {
    719721                        for {
     
    721723                                select {
    722724                                case <-ch:
     725                                        uc := net.upstream()
     726                                        if uc == nil {
     727                                                dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
     728                                                break
     729                                        }
    723730                                        dc.ringMessages <- ringMessage{consumer, uc}
    724731                                case <-dc.closed:
     
    738745
    739746                        if lastDownstream {
    740                                 uc.network.lock.Lock()
    741                                 uc.network.history[historyName] = seq
    742                                 uc.network.lock.Unlock()
     747                                net.lock.Lock()
     748                                net.history[historyName] = seq
     749                                net.lock.Unlock()
    743750                        }
    744751                }()
     
    10871094                        dc.lock.Unlock()
    10881095
    1089                         uc.ring.Produce(echoMsg)
     1096                        uc.network.ring.Produce(echoMsg)
    10901097                }
    10911098        default:
Note: See TracChangeset for help on using the changeset viewer.