Changeset 144 in code for trunk


Ignore:
Timestamp:
Mar 25, 2020, 10:28:25 AM (5 years ago)
Author:
contact
Message:

Consume ring buffer for networks added on-the-fly

Location:
trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r143 r144  
    703703
    704704        dc.forEachNetwork(func(net *network) {
    705                 historyName := dc.rawUsername
    706 
    707                 // TODO: need to take dc.network into account here
    708                 var seqPtr *uint64
    709                 if firstDownstream {
    710                         net.lock.Lock()
    711                         seq, ok := net.history[historyName]
    712                         net.lock.Unlock()
    713                         if ok {
    714                                 seqPtr = &seq
    715                         }
    716                 }
    717 
    718                 // TODO: we need to create a consumer when adding networks on-the-fly
    719                 consumer, ch := net.ring.NewConsumer(seqPtr)
    720                 go func() {
    721                         for {
    722                                 var closed bool
    723                                 select {
    724                                 case <-ch:
    725                                         uc := net.upstream()
    726                                         if uc == nil {
    727                                                 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
    728                                                 break
    729                                         }
    730                                         dc.ringMessages <- ringMessage{consumer, uc}
    731                                 case <-dc.closed:
    732                                         closed = true
    733                                 }
    734                                 if closed {
     705                // TODO: need to take dc.network into account when deciding whether or
     706                // not to load history
     707                dc.runNetwork(net, firstDownstream)
     708        })
     709
     710        return nil
     711}
     712
     713// runNetwork starts listening for messages coming from the network's ring
     714// buffer.
     715//
     716// It panics if the network is not suitable for the downstream connection.
     717func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
     718        if dc.network != nil && net != dc.network {
     719                panic("network not suitable for downstream connection")
     720        }
     721
     722        historyName := dc.rawUsername
     723
     724        var seqPtr *uint64
     725        if loadHistory {
     726                net.lock.Lock()
     727                seq, ok := net.history[historyName]
     728                net.lock.Unlock()
     729                if ok {
     730                        seqPtr = &seq
     731                }
     732        }
     733
     734        consumer, ch := net.ring.NewConsumer(seqPtr)
     735        go func() {
     736                for {
     737                        var closed bool
     738                        select {
     739                        case <-ch:
     740                                uc := net.upstream()
     741                                if uc == nil {
     742                                        dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
    735743                                        break
    736744                                }
    737                         }
    738 
    739                         seq := consumer.Close()
    740 
    741                         // TODO: need to take dc.network into account here
    742                         dc.user.lock.Lock()
    743                         lastDownstream := len(dc.user.downstreamConns) == 0
    744                         dc.user.lock.Unlock()
    745 
    746                         if lastDownstream {
    747                                 net.lock.Lock()
    748                                 net.history[historyName] = seq
    749                                 net.lock.Unlock()
    750                         }
    751                 }()
    752         })
    753 
    754         return nil
     745                                dc.ringMessages <- ringMessage{consumer, uc}
     746                        case <-dc.closed:
     747                                closed = true
     748                        }
     749                        if closed {
     750                                break
     751                        }
     752                }
     753
     754                seq := consumer.Close()
     755
     756                // TODO: need to take dc.network into account here
     757                dc.user.lock.Lock()
     758                lastDownstream := len(dc.user.downstreamConns) == 0
     759                dc.user.lock.Unlock()
     760
     761                if lastDownstream {
     762                        net.lock.Lock()
     763                        net.history[historyName] = seq
     764                        net.lock.Unlock()
     765                }
     766        }()
    755767}
    756768
  • trunk/user.go

    r143 r144  
    199199
    200200func (u *user) createNetwork(net *Network) (*network, error) {
     201        if net.ID != 0 {
     202                panic("tried creating an already-existing network")
     203        }
     204
    201205        network := newNetwork(u, net)
    202206        err := u.srv.db.StoreNetwork(u.Username, &network.Network)
     
    204208                return nil, err
    205209        }
     210
     211        u.forEachDownstream(func(dc *downstreamConn) {
     212                if dc.network == nil {
     213                        dc.runNetwork(network, false)
     214                }
     215        })
     216
    206217        u.lock.Lock()
    207218        u.networks = append(u.networks, network)
    208219        u.lock.Unlock()
     220
    209221        go network.run()
    210222        return network, nil
Note: See TracChangeset for help on using the changeset viewer.