Changeset 227 in code for trunk/downstream.go


Ignore:
Timestamp:
Apr 6, 2020, 4:05:36 PM (5 years ago)
Author:
contact
Message:

Remove per-network ring buffer goroutines

Just dispatch from the user goroutine. This allows removes a lot of complexity.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r226 r227  
    232232
    233233        dc.conn.SendMessage(msg)
     234}
     235
     236func (dc *downstreamConn) sendFromUpstream(msg *irc.Message, uc *upstreamConn) {
     237        dc.lock.Lock()
     238        _, ours := dc.ourMessages[msg]
     239        delete(dc.ourMessages, msg)
     240        dc.lock.Unlock()
     241        if ours && !dc.getCap("echo-message") {
     242                // The message comes from our connection, don't echo it
     243                // back
     244                return
     245        }
     246
     247        msg = msg.Copy()
     248        switch msg.Command {
     249        case "PRIVMSG", "NOTICE":
     250                msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
     251                msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
     252        default:
     253                panic(fmt.Sprintf("unexpected %q message", msg.Command))
     254        }
     255
     256        dc.SendMessage(msg)
    234257}
    235258
     
    664687
    665688        dc.forEachNetwork(func(net *network) {
    666                 dc.runNetwork(net, sendHistory)
     689                var seqPtr *uint64
     690                if sendHistory {
     691                        seq, ok := net.history[dc.clientName]
     692                        if ok {
     693                                seqPtr = &seq
     694                        }
     695                }
     696
     697                consumer, _ := net.ring.NewConsumer(seqPtr)
     698
     699                if _, ok := dc.ringConsumers[net]; ok {
     700                        panic("network has been added twice")
     701                }
     702                dc.ringConsumers[net] = consumer
     703
     704                // TODO: this means all history is lost when trying to send it while the
     705                // upstream is disconnected. We need to store history differently so that
     706                // we don't need access to upstreamConn to forward it to a downstream
     707                // client.
     708                uc := net.upstream()
     709                if uc == nil {
     710                        dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
     711                        return
     712                }
     713
     714                for {
     715                        msg := consumer.Peek()
     716                        if msg == nil {
     717                                break
     718                        }
     719
     720                        dc.sendFromUpstream(msg, uc)
     721                        consumer.Consume()
     722                }
    667723        })
    668724
    669725        return nil
    670 }
    671 
    672 // runNetwork starts listening for messages coming from the network's ring
    673 // buffer.
    674 //
    675 // It panics if the network is not suitable for the downstream connection.
    676 func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
    677         if dc.network != nil && net != dc.network {
    678                 panic("network not suitable for downstream connection")
    679         }
    680 
    681         var seqPtr *uint64
    682         if loadHistory {
    683                 seq, ok := net.history[dc.clientName]
    684                 if ok {
    685                         seqPtr = &seq
    686                 }
    687         }
    688 
    689         consumer, ch := net.ring.NewConsumer(seqPtr)
    690 
    691         if _, ok := dc.ringConsumers[net]; ok {
    692                 panic("network has been added twice")
    693         }
    694         dc.ringConsumers[net] = consumer
    695 
    696         go func() {
    697                 for range ch {
    698                         uc := net.upstream()
    699                         if uc == nil {
    700                                 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
    701                                 continue
    702                         }
    703 
    704                         for {
    705                                 msg := consumer.Peek()
    706                                 if msg == nil {
    707                                         break
    708                                 }
    709 
    710                                 dc.lock.Lock()
    711                                 _, ours := dc.ourMessages[msg]
    712                                 delete(dc.ourMessages, msg)
    713                                 dc.lock.Unlock()
    714                                 if ours && !dc.getCap("echo-message") {
    715                                         // The message comes from our connection, don't echo it
    716                                         // back
    717                                         consumer.Consume()
    718                                         continue
    719                                 }
    720 
    721                                 msg = msg.Copy()
    722                                 switch msg.Command {
    723                                 case "PRIVMSG", "NOTICE":
    724                                         msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
    725                                         msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
    726                                 default:
    727                                         panic(fmt.Sprintf("unexpected %q message", msg.Command))
    728                                 }
    729 
    730                                 dc.SendMessage(msg)
    731                                 consumer.Consume()
    732                         }
    733                 }
    734         }()
    735726}
    736727
Note: See TracChangeset for help on using the changeset viewer.