Changeset 71 in code for trunk


Ignore:
Timestamp:
Mar 3, 2020, 2:26:19 PM (5 years ago)
Author:
contact
Message:

Retry connecting to upstream servers

Rate-limit retries in case connecting immediately fails.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/server.go

    r67 r71  
    1313// TODO: make configurable
    1414var keepAlivePeriod = time.Minute
     15var retryConnectMinDelay = time.Minute
    1516
    1617func setKeepAlive(c net.Conn) error {
     
    138139}
    139140
     141func (s *Server) runUpstream(u *user, upstream *Upstream) {
     142        var lastTry time.Time
     143        for {
     144                if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
     145                        delay := retryConnectMinDelay - dur
     146                        s.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), upstream.Addr)
     147                        time.Sleep(delay)
     148                }
     149                lastTry = time.Now()
     150
     151                uc, err := connectToUpstream(u, upstream)
     152                if err != nil {
     153                        s.Logger.Printf("failed to connect to upstream server %q: %v", upstream.Addr, err)
     154                        continue
     155                }
     156
     157                uc.register()
     158
     159                u.lock.Lock()
     160                u.upstreamConns = append(u.upstreamConns, uc)
     161                u.lock.Unlock()
     162
     163                if err := uc.readMessages(); err != nil {
     164                        uc.logger.Printf("failed to handle messages: %v", err)
     165                }
     166                uc.Close()
     167
     168                u.lock.Lock()
     169                for i := range u.upstreamConns {
     170                        if u.upstreamConns[i] == uc {
     171                                u.upstreamConns = append(u.upstreamConns[:i], u.upstreamConns[i+1:]...)
     172                                break
     173                        }
     174                }
     175                u.lock.Unlock()
     176        }
     177}
     178
    140179func (s *Server) Run() {
    141180        // TODO: multi-user
     
    147186
    148187        for i := range s.Upstreams {
    149                 upstream := &s.Upstreams[i]
    150                 // TODO: retry connecting
    151                 go func() {
    152                         uc, err := connectToUpstream(u, upstream)
    153                         if err != nil {
    154                                 s.Logger.Printf("failed to connect to upstream server %q: %v", upstream.Addr, err)
    155                                 return
    156                         }
    157 
    158                         uc.register()
    159 
    160                         u.lock.Lock()
    161                         u.upstreamConns = append(u.upstreamConns, uc)
    162                         u.lock.Unlock()
    163 
    164                         if err := uc.readMessages(); err != nil {
    165                                 uc.logger.Printf("failed to handle messages: %v", err)
    166                         }
    167                         uc.Close()
    168 
    169                         u.lock.Lock()
    170                         for i := range u.upstreamConns {
    171                                 if u.upstreamConns[i] == uc {
    172                                         u.upstreamConns = append(u.upstreamConns[:i], u.upstreamConns[i+1:]...)
    173                                         break
    174                                 }
    175                         }
    176                         u.lock.Unlock()
    177                 }()
     188                go s.runUpstream(u, &s.Upstreams[i])
    178189        }
    179190}
Note: See TracChangeset for help on using the changeset viewer.