Changeset 210 in code for trunk/downstream.go
- Timestamp:
- Apr 3, 2020, 2:35:08 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r209 r210 53 53 54 54 type downstreamConn struct { 55 id uint64 56 net net.Conn 57 irc *irc.Conn 58 srv *Server 59 logger Logger 60 outgoing chan<- *irc.Message 61 closed chan struct{} 55 conn 56 57 id uint64 62 58 63 59 registered bool … … 85 81 86 82 func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { 87 outgoing := make(chan *irc.Message, 64)83 logger := &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())} 88 84 dc := &downstreamConn{ 85 conn: *newConn(srv, netConn, logger), 89 86 id: id, 90 net: netConn,91 irc: irc.NewConn(netConn),92 srv: srv,93 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},94 outgoing: outgoing,95 closed: make(chan struct{}),96 87 ringConsumers: make(map[*network]*RingConsumer), 97 88 caps: make(map[string]bool), … … 102 93 dc.hostname = host 103 94 } 104 105 go func() {106 for msg := range outgoing {107 if dc.srv.Debug {108 dc.logger.Printf("sent: %v", msg)109 }110 dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))111 if err := dc.irc.WriteMessage(msg); err != nil {112 dc.logger.Printf("failed to write message: %v", err)113 break114 }115 }116 if err := dc.net.Close(); err != nil {117 dc.logger.Printf("failed to close connection: %v", err)118 } else {119 dc.logger.Printf("connection closed")120 }121 // Drain the outgoing channel to prevent SendMessage from blocking122 for range outgoing {123 // This space is intentionally left blank124 }125 }()126 127 dc.logger.Printf("new connection")128 95 return dc 129 96 } … … 228 195 } 229 196 230 func (dc *downstreamConn) isClosed() bool {231 select {232 case <-dc.closed:233 return true234 default:235 return false236 }237 }238 239 197 func (dc *downstreamConn) readMessages(ch chan<- event) error { 240 198 for { 241 msg, err := dc. irc.ReadMessage()199 msg, err := dc.ReadMessage() 242 200 if err == io.EOF { 243 201 break … … 246 204 } 247 205 248 if dc.srv.Debug {249 dc.logger.Printf("received: %v", msg)250 }251 252 206 ch <- eventDownstreamMessage{msg, dc} 253 207 } … … 256 210 } 257 211 258 func (dc *downstreamConn) writeMessages() error {259 return nil260 }261 262 // Close closes the connection. It is safe to call from any goroutine.263 func (dc *downstreamConn) Close() error {264 if dc.isClosed() {265 return fmt.Errorf("downstream connection already closed")266 }267 close(dc.closed)268 close(dc.outgoing)269 return nil270 }271 272 // SendMessage queues a new outgoing message. It is safe to call from any273 // goroutine.274 212 func (dc *downstreamConn) SendMessage(msg *irc.Message) { 275 if dc.isClosed() {276 return277 }278 213 // TODO: strip tags if the client doesn't support them (see runNetwork) 279 dc. outgoing <- msg214 dc.conn.SendMessage(msg) 280 215 } 281 216
Note:
See TracChangeset
for help on using the changeset viewer.