Changeset 77 in code for trunk/server.go


Ignore:
Timestamp:
Mar 4, 2020, 5:22:58 PM (5 years ago)
Author:
contact
Message:

Add SQLite database

Closes: https://todo.sr.ht/~emersion/jounce/9

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/server.go

    r75 r77  
    4848}
    4949
     50type network struct {
     51        Network
     52        user *user
     53        conn *upstreamConn
     54}
     55
     56func newNetwork(user *user, record *Network) *network {
     57        return &network{
     58                Network: *record,
     59                user:    user,
     60        }
     61}
     62
     63func (net *network) run() {
     64        var lastTry time.Time
     65        for {
     66                if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
     67                        delay := retryConnectMinDelay - dur
     68                        net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
     69                        time.Sleep(delay)
     70                }
     71                lastTry = time.Now()
     72
     73                uc, err := connectToUpstream(net)
     74                if err != nil {
     75                        net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
     76                        continue
     77                }
     78
     79                uc.register()
     80
     81                net.user.lock.Lock()
     82                net.conn = uc
     83                net.user.lock.Unlock()
     84
     85                if err := uc.readMessages(); err != nil {
     86                        uc.logger.Printf("failed to handle messages: %v", err)
     87                }
     88                uc.Close()
     89
     90                net.user.lock.Lock()
     91                net.conn = nil
     92                net.user.lock.Unlock()
     93        }
     94}
     95
    5096type user struct {
    51         username string
     97        User
    5298        srv      *Server
    5399
    54100        lock            sync.Mutex
    55         upstreamConns   []*upstreamConn
     101        networks        []*network
    56102        downstreamConns []*downstreamConn
    57103}
    58104
    59 func newUser(srv *Server, username string) *user {
     105func newUser(srv *Server, record *User) *user {
    60106        return &user{
    61                 username: username,
    62                 srv:      srv,
     107                User: *record,
     108                srv:  srv,
    63109        }
    64110}
     
    66112func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
    67113        u.lock.Lock()
    68         for _, uc := range u.upstreamConns {
    69                 if !uc.registered || uc.closed {
     114        for _, network := range u.networks {
     115                uc := network.conn
     116                if uc == nil || !uc.registered || uc.closed {
    70117                        continue
    71118                }
     
    83130}
    84131
    85 func (u *user) getUpstream(name string) *Upstream {
    86         for i, upstream := range u.srv.Upstreams {
    87                 if upstream.Addr == name {
    88                         return &u.srv.Upstreams[i]
     132func (u *user) getNetwork(name string) *network {
     133        for _, network := range u.networks {
     134                if network.Addr == name {
     135                        return network
    89136                }
    90137        }
     
    92139}
    93140
    94 type Upstream struct {
    95         Addr     string
    96         Nick     string
    97         Username string
    98         Realname string
    99         Channels []string
     141func (u *user) run() {
     142        networks, err := u.srv.db.ListNetworks(u.Username)
     143        if err != nil {
     144                u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
     145                return
     146        }
     147
     148        u.lock.Lock()
     149        for _, record := range networks {
     150                network := newNetwork(u, &record)
     151                u.networks = append(u.networks, network)
     152
     153                go network.run()
     154        }
     155        u.lock.Unlock()
    100156}
    101157
     
    105161        RingCap   int
    106162        Debug     bool
    107         Upstreams []Upstream // TODO: per-user
     163
     164        db *DB
    108165
    109166        lock            sync.Mutex
     
    112169}
    113170
    114 func NewServer() *Server {
     171func NewServer(db *DB) *Server {
    115172        return &Server{
    116173                Logger:  log.New(log.Writer(), "", log.LstdFlags),
    117174                RingCap: 4096,
    118175                users:   make(map[string]*user),
     176                db:      db,
    119177        }
    120178}
     
    124182}
    125183
    126 func (s *Server) runUpstream(u *user, upstream *Upstream) {
    127         var lastTry time.Time
    128         for {
    129                 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
    130                         delay := retryConnectMinDelay - dur
    131                         s.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), upstream.Addr)
    132                         time.Sleep(delay)
    133                 }
    134                 lastTry = time.Now()
    135 
    136                 uc, err := connectToUpstream(u, upstream)
    137                 if err != nil {
    138                         s.Logger.Printf("failed to connect to upstream server %q: %v", upstream.Addr, err)
    139                         continue
    140                 }
    141 
    142                 uc.register()
    143 
    144                 u.lock.Lock()
    145                 u.upstreamConns = append(u.upstreamConns, uc)
    146                 u.lock.Unlock()
    147 
    148                 if err := uc.readMessages(); err != nil {
    149                         uc.logger.Printf("failed to handle messages: %v", err)
    150                 }
    151                 uc.Close()
    152 
    153                 u.lock.Lock()
    154                 for i := range u.upstreamConns {
    155                         if u.upstreamConns[i] == uc {
    156                                 u.upstreamConns = append(u.upstreamConns[:i], u.upstreamConns[i+1:]...)
    157                                 break
    158                         }
    159                 }
    160                 u.lock.Unlock()
    161         }
    162 }
    163 
    164 func (s *Server) Run() {
    165         // TODO: multi-user
    166         u := newUser(s, "jounce")
     184func (s *Server) Run() error {
     185        users, err := s.db.ListUsers()
     186        if err != nil {
     187                return err
     188        }
    167189
    168190        s.lock.Lock()
    169         s.users[u.username] = u
     191        for _, record := range users {
     192                s.Logger.Printf("starting bouncer for user %q", record.Username)
     193                u := newUser(s, &record)
     194                s.users[u.Username] = u
     195
     196                go u.run()
     197        }
    170198        s.lock.Unlock()
    171199
    172         for i := range s.Upstreams {
    173                 go s.runUpstream(u, &s.Upstreams[i])
    174         }
     200        select {}
    175201}
    176202
Note: See TracChangeset for help on using the changeset viewer.