Changeset 210 in code for trunk


Ignore:
Timestamp:
Apr 3, 2020, 2:35:08 PM (5 years ago)
Author:
contact
Message:

Introduce conn for common connection logic

This centralizes the common upstream & downstream bits.

Location:
trunk
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r209 r210  
    5353
    5454type downstreamConn struct {
    55         id       uint64
    56         net      net.Conn
    57         irc      *irc.Conn
    58         srv      *Server
    59         logger   Logger
    60         outgoing chan<- *irc.Message
    61         closed   chan struct{}
     55        conn
     56
     57        id uint64
    6258
    6359        registered  bool
     
    8581
    8682func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn {
    87         outgoing := make(chan *irc.Message, 64)
     83        logger := &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}
    8884        dc := &downstreamConn{
     85                conn:          *newConn(srv, netConn, logger),
    8986                id:            id,
    90                 net:           netConn,
    91                 irc:           irc.NewConn(netConn),
    92                 srv:           srv,
    93                 logger:        &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
    94                 outgoing:      outgoing,
    95                 closed:        make(chan struct{}),
    9687                ringConsumers: make(map[*network]*RingConsumer),
    9788                caps:          make(map[string]bool),
     
    10293                dc.hostname = host
    10394        }
    104 
    105         go func() {
    106                 for msg := range outgoing {
    107                         if dc.srv.Debug {
    108                                 dc.logger.Printf("sent: %v", msg)
    109                         }
    110                         dc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
    111                         if err := dc.irc.WriteMessage(msg); err != nil {
    112                                 dc.logger.Printf("failed to write message: %v", err)
    113                                 break
    114                         }
    115                 }
    116                 if err := dc.net.Close(); err != nil {
    117                         dc.logger.Printf("failed to close connection: %v", err)
    118                 } else {
    119                         dc.logger.Printf("connection closed")
    120                 }
    121                 // Drain the outgoing channel to prevent SendMessage from blocking
    122                 for range outgoing {
    123                         // This space is intentionally left blank
    124                 }
    125         }()
    126 
    127         dc.logger.Printf("new connection")
    12895        return dc
    12996}
     
    228195}
    229196
    230 func (dc *downstreamConn) isClosed() bool {
    231         select {
    232         case <-dc.closed:
    233                 return true
    234         default:
    235                 return false
    236         }
    237 }
    238 
    239197func (dc *downstreamConn) readMessages(ch chan<- event) error {
    240198        for {
    241                 msg, err := dc.irc.ReadMessage()
     199                msg, err := dc.ReadMessage()
    242200                if err == io.EOF {
    243201                        break
     
    246204                }
    247205
    248                 if dc.srv.Debug {
    249                         dc.logger.Printf("received: %v", msg)
    250                 }
    251 
    252206                ch <- eventDownstreamMessage{msg, dc}
    253207        }
     
    256210}
    257211
    258 func (dc *downstreamConn) writeMessages() error {
    259         return nil
    260 }
    261 
    262 // Close closes the connection. It is safe to call from any goroutine.
    263 func (dc *downstreamConn) Close() error {
    264         if dc.isClosed() {
    265                 return fmt.Errorf("downstream connection already closed")
    266         }
    267         close(dc.closed)
    268         close(dc.outgoing)
    269         return nil
    270 }
    271 
    272 // SendMessage queues a new outgoing message. It is safe to call from any
    273 // goroutine.
    274212func (dc *downstreamConn) SendMessage(msg *irc.Message) {
    275         if dc.isClosed() {
    276                 return
    277         }
    278213        // TODO: strip tags if the client doesn't support them (see runNetwork)
    279         dc.outgoing <- msg
     214        dc.conn.SendMessage(msg)
    280215}
    281216
  • trunk/server.go

    r206 r210  
    1616var connectTimeout = 15 * time.Second
    1717var writeTimeout = 10 * time.Second
    18 
    19 func setKeepAlive(c net.Conn) error {
    20         tcpConn, ok := c.(*net.TCPConn)
    21         if !ok {
    22                 return fmt.Errorf("cannot enable keep-alive on a non-TCP connection")
    23         }
    24         if err := tcpConn.SetKeepAlive(true); err != nil {
    25                 return err
    26         }
    27         return tcpConn.SetKeepAlivePeriod(keepAlivePeriod)
    28 }
    2918
    3019type Logger interface {
     
    11099                }
    111100
    112                 setKeepAlive(netConn)
    113 
    114101                dc := newDownstreamConn(s, netConn, nextDownstreamID)
    115102                nextDownstreamID++
  • trunk/upstream.go

    r209 r210  
    3232
    3333type upstreamConn struct {
    34         network  *network
    35         logger   Logger
    36         net      net.Conn
    37         irc      *irc.Conn
    38         srv      *Server
    39         user     *user
    40         outgoing chan<- *irc.Message
    41         closed   chan struct{}
     34        conn
     35
     36        network *network
     37        user    *user
    4238
    4339        serverName            string
     
    9187        }
    9288
    93         setKeepAlive(netConn)
    94 
    95         outgoing := make(chan *irc.Message, 64)
    9689        uc := &upstreamConn{
     90                conn:                     *newConn(network.user.srv, netConn, logger),
    9791                network:                  network,
    98                 logger:                   logger,
    99                 net:                      netConn,
    100                 irc:                      irc.NewConn(netConn),
    101                 srv:                      network.user.srv,
    10292                user:                     network.user,
    103                 outgoing:                 outgoing,
    104                 closed:                   make(chan struct{}),
    10593                channels:                 make(map[string]*upstreamChannel),
    10694                caps:                     make(map[string]string),
     
    113101        }
    114102
    115         go func() {
    116                 for msg := range outgoing {
    117                         if uc.srv.Debug {
    118                                 uc.logger.Printf("sent: %v", msg)
    119                         }
    120                         uc.net.SetWriteDeadline(time.Now().Add(writeTimeout))
    121                         if err := uc.irc.WriteMessage(msg); err != nil {
    122                                 uc.logger.Printf("failed to write message: %v", err)
    123                                 break
    124                         }
    125                 }
    126                 if err := uc.net.Close(); err != nil {
    127                         uc.logger.Printf("failed to close connection: %v", err)
    128                 } else {
    129                         uc.logger.Printf("connection closed")
    130                 }
    131                 // Drain the outgoing channel to prevent SendMessage from blocking
    132                 for range outgoing {
    133                         // This space is intentionally left blank
    134                 }
    135         }()
    136 
    137103        return uc, nil
    138 }
    139 
    140 func (uc *upstreamConn) isClosed() bool {
    141         select {
    142         case <-uc.closed:
    143                 return true
    144         default:
    145                 return false
    146         }
    147 }
    148 
    149 // Close closes the connection. It is safe to call from any goroutine.
    150 func (uc *upstreamConn) Close() error {
    151         if uc.isClosed() {
    152                 return fmt.Errorf("upstream connection already closed")
    153         }
    154         close(uc.closed)
    155         close(uc.outgoing)
    156         return nil
    157104}
    158105
     
    14101357func (uc *upstreamConn) readMessages(ch chan<- event) error {
    14111358        for {
    1412                 msg, err := uc.irc.ReadMessage()
     1359                msg, err := uc.ReadMessage()
    14131360                if err == io.EOF {
    14141361                        break
     
    14171364                }
    14181365
    1419                 if uc.srv.Debug {
    1420                         uc.logger.Printf("received: %v", msg)
    1421                 }
    1422 
    14231366                ch <- eventUpstreamMessage{msg, uc}
    14241367        }
    14251368
    14261369        return nil
    1427 }
    1428 
    1429 // SendMessage queues a new outgoing message. It is safe to call from any
    1430 // goroutine.
    1431 func (uc *upstreamConn) SendMessage(msg *irc.Message) {
    1432         uc.outgoing <- msg
    14331370}
    14341371
Note: See TracChangeset for help on using the changeset viewer.