Changeset 227 in code for trunk


Ignore:
Timestamp:
Apr 6, 2020, 4:05:36 PM (5 years ago)
Author:
contact
Message:

Remove per-network ring buffer goroutines

Just dispatch from the user goroutine. This allows removes a lot of complexity.

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/doc/architecture.md

    r170 r227  
    3232and downstream message handlers are called from this goroutine, thus they can
    3333safely access both upstream and downstream state.
    34 
    35 In addition to these goroutines, each downstream connection also has one
    36 goroutine per network to handle new upstream messages coming from the ring
    37 buffer.
  • trunk/downstream.go

    r226 r227  
    232232
    233233        dc.conn.SendMessage(msg)
     234}
     235
     236func (dc *downstreamConn) sendFromUpstream(msg *irc.Message, uc *upstreamConn) {
     237        dc.lock.Lock()
     238        _, ours := dc.ourMessages[msg]
     239        delete(dc.ourMessages, msg)
     240        dc.lock.Unlock()
     241        if ours && !dc.getCap("echo-message") {
     242                // The message comes from our connection, don't echo it
     243                // back
     244                return
     245        }
     246
     247        msg = msg.Copy()
     248        switch msg.Command {
     249        case "PRIVMSG", "NOTICE":
     250                msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
     251                msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
     252        default:
     253                panic(fmt.Sprintf("unexpected %q message", msg.Command))
     254        }
     255
     256        dc.SendMessage(msg)
    234257}
    235258
     
    664687
    665688        dc.forEachNetwork(func(net *network) {
    666                 dc.runNetwork(net, sendHistory)
     689                var seqPtr *uint64
     690                if sendHistory {
     691                        seq, ok := net.history[dc.clientName]
     692                        if ok {
     693                                seqPtr = &seq
     694                        }
     695                }
     696
     697                consumer, _ := net.ring.NewConsumer(seqPtr)
     698
     699                if _, ok := dc.ringConsumers[net]; ok {
     700                        panic("network has been added twice")
     701                }
     702                dc.ringConsumers[net] = consumer
     703
     704                // TODO: this means all history is lost when trying to send it while the
     705                // upstream is disconnected. We need to store history differently so that
     706                // we don't need access to upstreamConn to forward it to a downstream
     707                // client.
     708                uc := net.upstream()
     709                if uc == nil {
     710                        dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
     711                        return
     712                }
     713
     714                for {
     715                        msg := consumer.Peek()
     716                        if msg == nil {
     717                                break
     718                        }
     719
     720                        dc.sendFromUpstream(msg, uc)
     721                        consumer.Consume()
     722                }
    667723        })
    668724
    669725        return nil
    670 }
    671 
    672 // runNetwork starts listening for messages coming from the network's ring
    673 // buffer.
    674 //
    675 // It panics if the network is not suitable for the downstream connection.
    676 func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {
    677         if dc.network != nil && net != dc.network {
    678                 panic("network not suitable for downstream connection")
    679         }
    680 
    681         var seqPtr *uint64
    682         if loadHistory {
    683                 seq, ok := net.history[dc.clientName]
    684                 if ok {
    685                         seqPtr = &seq
    686                 }
    687         }
    688 
    689         consumer, ch := net.ring.NewConsumer(seqPtr)
    690 
    691         if _, ok := dc.ringConsumers[net]; ok {
    692                 panic("network has been added twice")
    693         }
    694         dc.ringConsumers[net] = consumer
    695 
    696         go func() {
    697                 for range ch {
    698                         uc := net.upstream()
    699                         if uc == nil {
    700                                 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
    701                                 continue
    702                         }
    703 
    704                         for {
    705                                 msg := consumer.Peek()
    706                                 if msg == nil {
    707                                         break
    708                                 }
    709 
    710                                 dc.lock.Lock()
    711                                 _, ours := dc.ourMessages[msg]
    712                                 delete(dc.ourMessages, msg)
    713                                 dc.lock.Unlock()
    714                                 if ours && !dc.getCap("echo-message") {
    715                                         // The message comes from our connection, don't echo it
    716                                         // back
    717                                         consumer.Consume()
    718                                         continue
    719                                 }
    720 
    721                                 msg = msg.Copy()
    722                                 switch msg.Command {
    723                                 case "PRIVMSG", "NOTICE":
    724                                         msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
    725                                         msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
    726                                 default:
    727                                         panic(fmt.Sprintf("unexpected %q message", msg.Command))
    728                                 }
    729 
    730                                 dc.SendMessage(msg)
    731                                 consumer.Consume()
    732                         }
    733                 }
    734         }()
    735726}
    736727
  • trunk/upstream.go

    r226 r227  
    13661366func (uc *upstreamConn) produce(msg *irc.Message) {
    13671367        uc.network.ring.Produce(msg)
     1368
     1369        uc.forEachDownstream(func(dc *downstreamConn) {
     1370                dc.sendFromUpstream(dc.ringConsumers[uc.network].Consume(), uc)
     1371        })
    13681372}
    13691373
  • trunk/user.go

    r223 r227  
    352352        u.forEachDownstream(func(dc *downstreamConn) {
    353353                if dc.network == nil {
    354                         dc.runNetwork(network, false)
     354                        consumer, _ := network.ring.NewConsumer(nil)
     355                        dc.ringConsumers[network] = consumer
    355356                }
    356357        })
     
    376377                                dc.Close()
    377378                        }
     379                        delete(dc.ringConsumers, net)
    378380                })
    379381
Note: See TracChangeset for help on using the changeset viewer.