- Timestamp:
- Apr 3, 2020, 2:35:08 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r209 r210 53 53 54 54 type downstreamConn struct { 55 id uint64 56 net net.Conn 57 irc *irc.Conn 58 srv *Server 59 logger Logger 60 outgoing chan<- *irc.Message 61 closed chan struct{} 55 conn 56 57 id uint64 62 58 63 59 registered bool … … 85 81 86 82 func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { 87 outgoing := make(chan *irc.Message, 64)83 logger := &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())} 88 84 dc := &downstreamConn{ 85 conn: *newConn(srv, netConn, logger), 89 86 id: id, 90 net: netConn,91 irc: irc.NewConn(netConn),92 srv: srv,93 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},94 outgoing: outgoing,95 closed: make(chan struct{}),96 87 ringConsumers: make(map[*network]*RingConsumer), 97 88 caps: make(map[string]bool), … … 102 93 dc.hostname = host 103 94 } 104 105 go func() {106 for msg := range outgoing {107 if dc.srv.Debug {108 dc.logger.Printf("sent: %v", msg)109 }110 dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))111 if err := dc.irc.WriteMessage(msg); err != nil {112 dc.logger.Printf("failed to write message: %v", err)113 break114 }115 }116 if err := dc.net.Close(); err != nil {117 dc.logger.Printf("failed to close connection: %v", err)118 } else {119 dc.logger.Printf("connection closed")120 }121 // Drain the outgoing channel to prevent SendMessage from blocking122 for range outgoing {123 // This space is intentionally left blank124 }125 }()126 127 dc.logger.Printf("new connection")128 95 return dc 129 96 } … … 228 195 } 229 196 230 func (dc *downstreamConn) isClosed() bool {231 select {232 case <-dc.closed:233 return true234 default:235 return false236 }237 }238 239 197 func (dc *downstreamConn) readMessages(ch chan<- event) error { 240 198 for { 241 msg, err := dc. irc.ReadMessage()199 msg, err := dc.ReadMessage() 242 200 if err == io.EOF { 243 201 break … … 246 204 } 247 205 248 if dc.srv.Debug {249 dc.logger.Printf("received: %v", msg)250 }251 252 206 ch <- eventDownstreamMessage{msg, dc} 253 207 } … … 256 210 } 257 211 258 func (dc *downstreamConn) writeMessages() error {259 return nil260 }261 262 // Close closes the connection. It is safe to call from any goroutine.263 func (dc *downstreamConn) Close() error {264 if dc.isClosed() {265 return fmt.Errorf("downstream connection already closed")266 }267 close(dc.closed)268 close(dc.outgoing)269 return nil270 }271 272 // SendMessage queues a new outgoing message. It is safe to call from any273 // goroutine.274 212 func (dc *downstreamConn) SendMessage(msg *irc.Message) { 275 if dc.isClosed() {276 return277 }278 213 // TODO: strip tags if the client doesn't support them (see runNetwork) 279 dc. outgoing <- msg214 dc.conn.SendMessage(msg) 280 215 } 281 216 -
trunk/server.go
r206 r210 16 16 var connectTimeout = 15 * time.Second 17 17 var writeTimeout = 10 * time.Second 18 19 func setKeepAlive(c net.Conn) error {20 tcpConn, ok := c.(*net.TCPConn)21 if !ok {22 return fmt.Errorf("cannot enable keep-alive on a non-TCP connection")23 }24 if err := tcpConn.SetKeepAlive(true); err != nil {25 return err26 }27 return tcpConn.SetKeepAlivePeriod(keepAlivePeriod)28 }29 18 30 19 type Logger interface { … … 110 99 } 111 100 112 setKeepAlive(netConn)113 114 101 dc := newDownstreamConn(s, netConn, nextDownstreamID) 115 102 nextDownstreamID++ -
trunk/upstream.go
r209 r210 32 32 33 33 type upstreamConn struct { 34 network *network 35 logger Logger 36 net net.Conn 37 irc *irc.Conn 38 srv *Server 39 user *user 40 outgoing chan<- *irc.Message 41 closed chan struct{} 34 conn 35 36 network *network 37 user *user 42 38 43 39 serverName string … … 91 87 } 92 88 93 setKeepAlive(netConn)94 95 outgoing := make(chan *irc.Message, 64)96 89 uc := &upstreamConn{ 90 conn: *newConn(network.user.srv, netConn, logger), 97 91 network: network, 98 logger: logger,99 net: netConn,100 irc: irc.NewConn(netConn),101 srv: network.user.srv,102 92 user: network.user, 103 outgoing: outgoing,104 closed: make(chan struct{}),105 93 channels: make(map[string]*upstreamChannel), 106 94 caps: make(map[string]string), … … 113 101 } 114 102 115 go func() {116 for msg := range outgoing {117 if uc.srv.Debug {118 uc.logger.Printf("sent: %v", msg)119 }120 uc.net.SetWriteDeadline(time.Now().Add(writeTimeout))121 if err := uc.irc.WriteMessage(msg); err != nil {122 uc.logger.Printf("failed to write message: %v", err)123 break124 }125 }126 if err := uc.net.Close(); err != nil {127 uc.logger.Printf("failed to close connection: %v", err)128 } else {129 uc.logger.Printf("connection closed")130 }131 // Drain the outgoing channel to prevent SendMessage from blocking132 for range outgoing {133 // This space is intentionally left blank134 }135 }()136 137 103 return uc, nil 138 }139 140 func (uc *upstreamConn) isClosed() bool {141 select {142 case <-uc.closed:143 return true144 default:145 return false146 }147 }148 149 // Close closes the connection. It is safe to call from any goroutine.150 func (uc *upstreamConn) Close() error {151 if uc.isClosed() {152 return fmt.Errorf("upstream connection already closed")153 }154 close(uc.closed)155 close(uc.outgoing)156 return nil157 104 } 158 105 … … 1410 1357 func (uc *upstreamConn) readMessages(ch chan<- event) error { 1411 1358 for { 1412 msg, err := uc. irc.ReadMessage()1359 msg, err := uc.ReadMessage() 1413 1360 if err == io.EOF { 1414 1361 break … … 1417 1364 } 1418 1365 1419 if uc.srv.Debug {1420 uc.logger.Printf("received: %v", msg)1421 }1422 1423 1366 ch <- eventUpstreamMessage{msg, uc} 1424 1367 } 1425 1368 1426 1369 return nil 1427 }1428 1429 // SendMessage queues a new outgoing message. It is safe to call from any1430 // goroutine.1431 func (uc *upstreamConn) SendMessage(msg *irc.Message) {1432 uc.outgoing <- msg1433 1370 } 1434 1371
Note:
See TracChangeset
for help on using the changeset viewer.