Changeset 209 in code for trunk


Ignore:
Timestamp:
Apr 3, 2020, 2:15:25 PM (5 years ago)
Author:
contact
Message:

Fix writer goroutine races

Any SendMessage call after Close could potentially block forever if the
outgoing channel was filled up. Now the channel is drained before the
writer goroutine exits.

Location:
trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r207 r209  
    5858        srv      *Server
    5959        logger   Logger
    60         outgoing chan *irc.Message
     60        outgoing chan<- *irc.Message
    6161        closed   chan struct{}
    6262
     
    8585
    8686func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
     87        outgoing := make(chan *irc.Message, 64)
    8788        dc := &downstreamConn{
    8889                id:            id,
     
    9192                srv:           srv,
    9293                logger:        &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
    93                 outgoing:      make(chan *irc.Message, 64),
     94                outgoing:      outgoing,
    9495                closed:        make(chan struct{}),
    9596                ringConsumers: make(map[*network]*RingConsumer),
     
    103104
    104105        go func() {
    105                 if err := dc.writeMessages(); err != nil {
    106                         dc.logger.Printf("failed to write message: %v", err)
     106                for msg := range outgoing {
     107                        if dc.srv.Debug {
     108                                dc.logger.Printf("sent: %v", msg)
     109                        }
     110                        dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
     111                        if err := dc.irc.WriteMessage(msg); err != nil {
     112                                dc.logger.Printf("failed to write message: %v", err)
     113                                break
     114                        }
    107115                }
    108116                if err := dc.net.Close(); err != nil {
     
    110118                } else {
    111119                        dc.logger.Printf("connection closed")
     120                }
     121                // Drain the outgoing channel to prevent SendMessage from blocking
     122                for range outgoing {
     123                        // This space is intentionally left blank
    112124                }
    113125        }()
     
    245257
    246258func (dc *downstreamConn) writeMessages() error {
    247         // TODO: any SendMessage call after the connection is closed will
    248         // either block or drop
    249         for {
    250                 var err error
    251                 var closed bool
    252                 select {
    253                 case msg := <-dc.outgoing:
    254                         if dc.srv.Debug {
    255                                 dc.logger.Printf("sent: %v", msg)
    256                         }
    257                         dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
    258                         err = dc.irc.WriteMessage(msg)
    259                 case <-dc.closed:
    260                         closed = true
    261                 }
    262                 if err != nil {
    263                         return err
    264                 }
    265                 if closed {
    266                         break
    267                 }
    268         }
    269259        return nil
    270260}
     
    276266        }
    277267        close(dc.closed)
     268        close(dc.outgoing)
    278269        return nil
    279270}
     
    282273// goroutine.
    283274func (dc *downstreamConn) SendMessage(msg *irc.Message) {
     275        if dc.isClosed() {
     276                return
     277        }
    284278        // TODO: strip tags if the client doesn't support them (see runNetwork)
    285279        dc.outgoing <- msg
  • trunk/upstream.go

    r206 r209  
    114114
    115115        go func() {
    116                 // TODO: any SendMessage call after the connection is closed will
    117                 // either block or drop
    118                 for {
    119                         var closed bool
    120                         select {
    121                         case msg := <-outgoing:
    122                                 if uc.srv.Debug {
    123                                         uc.logger.Printf("sent: %v", msg)
    124                                 }
    125                                 uc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
    126                                 if err := uc.irc.WriteMessage(msg); err != nil {
    127                                         uc.logger.Printf("failed to write message: %v", err)
    128                                         closed = true
    129                                 }
    130                         case <-uc.closed:
    131                                 closed = true
    132                         }
    133                         if closed {
     116                for msg := range outgoing {
     117                        if uc.srv.Debug {
     118                                uc.logger.Printf("sent: %v", msg)
     119                        }
     120                        uc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
     121                        if err := uc.irc.WriteMessage(msg); err != nil {
     122                                uc.logger.Printf("failed to write message: %v", err)
    134123                                break
    135124                        }
     
    139128                } else {
    140129                        uc.logger.Printf("connection closed")
     130                }
     131                // Drain the outgoing channel to prevent SendMessage from blocking
     132                for range outgoing {
     133                        // This space is intentionally left blank
    141134                }
    142135        }()
     
    160153        }
    161154        close(uc.closed)
     155        close(uc.outgoing)
    162156        return nil
    163157}
Note: See TracChangeset for help on using the changeset viewer.