Changeset 209 in code
- Timestamp:
- Apr 3, 2020, 2:15:25 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r207 r209 58 58 srv *Server 59 59 logger Logger 60 outgoing chan *irc.Message60 outgoing chan<- *irc.Message 61 61 closed chan struct{} 62 62 … … 85 85 86 86 func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { 87 outgoing := make(chan *irc.Message, 64) 87 88 dc := &downstreamConn{ 88 89 id: id, … … 91 92 srv: srv, 92 93 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 93 outgoing: make(chan *irc.Message, 64),94 outgoing: outgoing, 94 95 closed: make(chan struct{}), 95 96 ringConsumers: make(map[*network]*RingConsumer), … … 103 104 104 105 go func() { 105 if err := dc.writeMessages(); err != nil { 106 dc.logger.Printf("failed to write message: %v", err) 106 for msg := range outgoing { 107 if dc.srv.Debug { 108 dc.logger.Printf("sent: %v", msg) 109 } 110 dc.net.SetWriteDeadline(time.Now().Add(writeTimeout)) 111 if err := dc.irc.WriteMessage(msg); err != nil { 112 dc.logger.Printf("failed to write message: %v", err) 113 break 114 } 107 115 } 108 116 if err := dc.net.Close(); err != nil { … … 110 118 } else { 111 119 dc.logger.Printf("connection closed") 120 } 121 // Drain the outgoing channel to prevent SendMessage from blocking 122 for range outgoing { 123 // This space is intentionally left blank 112 124 } 113 125 }() … … 245 257 246 258 func (dc *downstreamConn) writeMessages() error { 247 // TODO: any SendMessage call after the connection is closed will248 // either block or drop249 for {250 var err error251 var closed bool252 select {253 case msg := <-dc.outgoing:254 if dc.srv.Debug {255 dc.logger.Printf("sent: %v", msg)256 }257 dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))258 err = dc.irc.WriteMessage(msg)259 case <-dc.closed:260 closed = true261 }262 if err != nil {263 return err264 }265 if closed {266 break267 }268 }269 259 return nil 270 260 } … … 276 266 } 277 267 close(dc.closed) 268 close(dc.outgoing) 278 269 return nil 279 270 } … … 282 273 // goroutine. 283 274 func (dc *downstreamConn) SendMessage(msg *irc.Message) { 275 if dc.isClosed() { 276 return 277 } 284 278 // TODO: strip tags if the client doesn't support them (see runNetwork) 285 279 dc.outgoing <- msg -
trunk/upstream.go
r206 r209 114 114 115 115 go func() { 116 // TODO: any SendMessage call after the connection is closed will 117 // either block or drop 118 for { 119 var closed bool 120 select { 121 case msg := <-outgoing: 122 if uc.srv.Debug { 123 uc.logger.Printf("sent: %v", msg) 124 } 125 uc.net.SetWriteDeadline(time.Now().Add(writeTimeout)) 126 if err := uc.irc.WriteMessage(msg); err != nil { 127 uc.logger.Printf("failed to write message: %v", err) 128 closed = true 129 } 130 case <-uc.closed: 131 closed = true 132 } 133 if closed { 116 for msg := range outgoing { 117 if uc.srv.Debug { 118 uc.logger.Printf("sent: %v", msg) 119 } 120 uc.net.SetWriteDeadline(time.Now().Add(writeTimeout)) 121 if err := uc.irc.WriteMessage(msg); err != nil { 122 uc.logger.Printf("failed to write message: %v", err) 134 123 break 135 124 } … … 139 128 } else { 140 129 uc.logger.Printf("connection closed") 130 } 131 // Drain the outgoing channel to prevent SendMessage from blocking 132 for range outgoing { 133 // This space is intentionally left blank 141 134 } 142 135 }() … … 160 153 } 161 154 close(uc.closed) 155 close(uc.outgoing) 162 156 return nil 163 157 }
Note:
See TracChangeset
for help on using the changeset viewer.