Changeset 204 in code for trunk


Ignore:
Timestamp:
Apr 1, 2020, 2:02:31 PM (5 years ago)
Author:
contact
Message:

Simplify ring consumer goroutine

Since network.history is now only accessed from the user goroutine, a
lock becomes unnecessary.

Location:
trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r203 r204  
    7272        network     *network // can be nil
    7373
     74        ringConsumers map[*network]*RingConsumer
     75
    7476        negociatingCaps bool
    7577        capVersion      int
     
    8486func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
    8587        dc := &downstreamConn{
    86                 id:          id,
    87                 net:         netConn,
    88                 irc:         irc.NewConn(netConn),
    89                 srv:         srv,
    90                 logger:      &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
    91                 outgoing:    make(chan *irc.Message, 64),
    92                 closed:      make(chan struct{}),
    93                 caps:        make(map[string]bool),
    94                 ourMessages: make(map[*irc.Message]struct{}),
     88                id:            id,
     89                net:           netConn,
     90                irc:           irc.NewConn(netConn),
     91                srv:           srv,
     92                logger:        &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
     93                outgoing:      make(chan *irc.Message, 64),
     94                closed:        make(chan struct{}),
     95                ringConsumers: make(map[*network]*RingConsumer),
     96                caps:          make(map[string]bool),
     97                ourMessages:   make(map[*irc.Message]struct{}),
    9598        }
    9699        dc.hostname = netConn.RemoteAddr().String()
     
    723726        var seqPtr *uint64
    724727        if loadHistory {
    725                 net.lock.Lock()
    726728                seq, ok := net.history[dc.clientName]
    727                 net.lock.Unlock()
    728729                if ok {
    729730                        seqPtr = &seq
     
    736737
    737738        consumer, ch := net.ring.NewConsumer(seqPtr)
     739
     740        if _, ok := dc.ringConsumers[net]; ok {
     741                panic("network has been added twice")
     742        }
     743        dc.ringConsumers[net] = consumer
     744
    738745        go func() {
    739                 for {
    740                         var closed bool
    741                         select {
    742                         case _, ok := <-ch:
    743                                 if !ok {
    744                                         closed = true
     746                for range ch {
     747                        uc := net.upstream()
     748                        if uc == nil {
     749                                dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
     750                                continue
     751                        }
     752
     753                        for {
     754                                msg := consumer.Peek()
     755                                if msg == nil {
    745756                                        break
    746757                                }
    747758
    748                                 uc := net.upstream()
    749                                 if uc == nil {
    750                                         dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)
    751                                         break
     759                                dc.lock.Lock()
     760                                _, ours := dc.ourMessages[msg]
     761                                delete(dc.ourMessages, msg)
     762                                dc.lock.Unlock()
     763                                if ours {
     764                                        // The message comes from our connection, don't echo it
     765                                        // back
     766                                        consumer.Consume()
     767                                        continue
    752768                                }
    753769
    754                                 for {
    755                                         msg := consumer.Peek()
    756                                         if msg == nil {
    757                                                 break
    758                                         }
    759 
    760                                         dc.lock.Lock()
    761                                         _, ours := dc.ourMessages[msg]
    762                                         delete(dc.ourMessages, msg)
    763                                         dc.lock.Unlock()
    764                                         if ours {
    765                                                 // The message comes from our connection, don't echo it
    766                                                 // back
    767                                                 consumer.Consume()
    768                                                 continue
    769                                         }
    770 
    771                                         msg = msg.Copy()
    772                                         switch msg.Command {
    773                                         case "PRIVMSG":
    774                                                 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
    775                                                 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
    776                                         default:
    777                                                 panic("expected to consume a PRIVMSG message")
    778                                         }
    779 
    780                                         if !msgTagsEnabled {
    781                                                 for name := range msg.Tags {
    782                                                         supported := false
    783                                                         switch name {
    784                                                         case "time":
    785                                                                 supported = serverTimeEnabled
    786                                                         }
    787                                                         if !supported {
    788                                                                 delete(msg.Tags, name)
    789                                                         }
     770                                msg = msg.Copy()
     771                                switch msg.Command {
     772                                case "PRIVMSG":
     773                                        msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)
     774                                        msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])
     775                                default:
     776                                        panic("expected to consume a PRIVMSG message")
     777                                }
     778
     779                                if !msgTagsEnabled {
     780                                        for name := range msg.Tags {
     781                                                supported := false
     782                                                switch name {
     783                                                case "time":
     784                                                        supported = serverTimeEnabled
     785                                                }
     786                                                if !supported {
     787                                                        delete(msg.Tags, name)
    790788                                                }
    791789                                        }
    792 
    793                                         dc.SendMessage(msg)
    794                                         consumer.Consume()
    795790                                }
    796                         case <-dc.closed:
    797                                 closed = true
    798                         }
    799                         if closed {
    800                                 break
    801                         }
    802                 }
    803 
    804                 // TODO: close the consumer from the user goroutine, so we don't need
    805                 // that net.history lock
    806                 seq := consumer.Close()
    807 
    808                 net.lock.Lock()
    809                 net.history[dc.clientName] = seq
    810                 net.lock.Unlock()
     791
     792                                dc.SendMessage(msg)
     793                                consumer.Consume()
     794                        }
     795                }
    811796        }()
    812797}
  • trunk/user.go

    r203 r204  
    4242        stopped chan struct{}
    4343
    44         lock    sync.Mutex
    45         conn    *upstreamConn
    4644        history map[string]uint64
     45
     46        lock sync.Mutex
     47        conn *upstreamConn
    4748}
    4849
     
    236237                case eventDownstreamDisconnected:
    237238                        dc := e.dc
     239
     240                        for net, rc := range dc.ringConsumers {
     241                                seq := rc.Close()
     242                                net.history[dc.clientName] = seq
     243                        }
     244
    238245                        for i := range u.downstreamConns {
    239246                                if u.downstreamConns[i] == dc {
Note: See TracChangeset for help on using the changeset viewer.