source: code/trunk/server.go@ 693

Last change on this file since 693 was 691, checked in by contact, 4 years ago

Allow most config options to be reloaded

Closes: https://todo.sr.ht/~emersion/soju/42

File size: 6.2 KB
RevLine 
[98]1package soju
[1]2
3import (
[652]4 "context"
[656]5 "errors"
[1]6 "fmt"
[656]7 "io"
[37]8 "log"
[472]9 "mime"
[1]10 "net"
[323]11 "net/http"
[689]12 "runtime/debug"
[24]13 "sync"
[323]14 "sync/atomic"
[67]15 "time"
[1]16
17 "gopkg.in/irc.v3"
[323]18 "nhooyr.io/websocket"
[370]19
20 "git.sr.ht/~emersion/soju/config"
[1]21)
22
[67]23// TODO: make configurable
[398]24var retryConnectDelay = time.Minute
[206]25var connectTimeout = 15 * time.Second
[205]26var writeTimeout = 10 * time.Second
[398]27var upstreamMessageDelay = 2 * time.Second
28var upstreamMessageBurst = 10
[675]29var backlogTimeout = 10 * time.Second
30var handleDownstreamMessageTimeout = 10 * time.Second
[670]31var chatHistoryLimit = 1000
32var backlogLimit = 4000
[67]33
[9]34type Logger interface {
35 Print(v ...interface{})
36 Printf(format string, v ...interface{})
37}
38
[21]39type prefixLogger struct {
40 logger Logger
41 prefix string
42}
43
44var _ Logger = (*prefixLogger)(nil)
45
46func (l *prefixLogger) Print(v ...interface{}) {
47 v = append([]interface{}{l.prefix}, v...)
48 l.logger.Print(v...)
49}
50
51func (l *prefixLogger) Printf(format string, v ...interface{}) {
52 v = append([]interface{}{l.prefix}, v...)
53 l.logger.Printf("%v"+format, v...)
54}
55
[691]56type Config struct {
[612]57 Hostname string
[662]58 Title string
[612]59 LogPath string
60 Debug bool
61 HTTPOrigins []string
62 AcceptProxyIPs config.IPSet
63 MaxUserNetworks int
[691]64 MOTD string
65}
[22]66
[691]67type Server struct {
68 Logger Logger
69 Identd *Identd // can be nil
70
71 config atomic.Value // *Config
[605]72 db Database
73 stopWG sync.WaitGroup
74 connCount int64 // atomic
[77]75
[449]76 lock sync.Mutex
77 listeners map[net.Listener]struct{}
78 users map[string]*user
[10]79}
80
[531]81func NewServer(db Database) *Server {
[636]82 srv := &Server{
[691]83 Logger: log.New(log.Writer(), "", log.LstdFlags),
84 db: db,
85 listeners: make(map[net.Listener]struct{}),
86 users: make(map[string]*user),
[37]87 }
[691]88 srv.config.Store(&Config{Hostname: "localhost", MaxUserNetworks: -1})
[636]89 return srv
[37]90}
91
[5]92func (s *Server) prefix() *irc.Prefix {
[691]93 return &irc.Prefix{Name: s.Config().Hostname}
[5]94}
95
[691]96func (s *Server) Config() *Config {
97 return s.config.Load().(*Config)
98}
99
100func (s *Server) SetConfig(cfg *Config) {
101 s.config.Store(cfg)
102}
103
[449]104func (s *Server) Start() error {
[652]105 users, err := s.db.ListUsers(context.TODO())
[77]106 if err != nil {
107 return err
108 }
[71]109
[77]110 s.lock.Lock()
[378]111 for i := range users {
112 s.addUserLocked(&users[i])
[71]113 }
[37]114 s.lock.Unlock()
115
[449]116 return nil
[10]117}
118
[449]119func (s *Server) Shutdown() {
120 s.lock.Lock()
121 for ln := range s.listeners {
122 if err := ln.Close(); err != nil {
123 s.Logger.Printf("failed to stop listener: %v", err)
124 }
125 }
126 for _, u := range s.users {
127 u.events <- eventStop{}
128 }
129 s.lock.Unlock()
130
131 s.stopWG.Wait()
[599]132
133 if err := s.db.Close(); err != nil {
134 s.Logger.Printf("failed to close DB: %v", err)
135 }
[449]136}
137
[680]138func (s *Server) createUser(ctx context.Context, user *User) (*user, error) {
[329]139 s.lock.Lock()
140 defer s.lock.Unlock()
141
142 if _, ok := s.users[user.Username]; ok {
143 return nil, fmt.Errorf("user %q already exists", user.Username)
144 }
145
[680]146 err := s.db.StoreUser(ctx, user)
[329]147 if err != nil {
148 return nil, fmt.Errorf("could not create user in db: %v", err)
149 }
150
[378]151 return s.addUserLocked(user), nil
[329]152}
153
[563]154func (s *Server) forEachUser(f func(*user)) {
155 s.lock.Lock()
156 for _, u := range s.users {
157 f(u)
158 }
159 s.lock.Unlock()
160}
161
[38]162func (s *Server) getUser(name string) *user {
163 s.lock.Lock()
164 u := s.users[name]
165 s.lock.Unlock()
166 return u
167}
168
[378]169func (s *Server) addUserLocked(user *User) *user {
170 s.Logger.Printf("starting bouncer for user %q", user.Username)
171 u := newUser(s, user)
172 s.users[u.Username] = u
173
[449]174 s.stopWG.Add(1)
175
[378]176 go func() {
[689]177 defer func() {
178 if err := recover(); err != nil {
179 s.Logger.Printf("panic serving user %q: %v\n%v", user.Username, err, debug.Stack())
180 }
181 }()
182
[378]183 u.run()
184
185 s.lock.Lock()
186 delete(s.users, u.Username)
187 s.lock.Unlock()
[449]188
189 s.stopWG.Done()
[378]190 }()
191
192 return u
193}
194
[323]195var lastDownstreamID uint64 = 0
196
[347]197func (s *Server) handle(ic ircConn) {
[689]198 defer func() {
199 if err := recover(); err != nil {
200 s.Logger.Printf("panic serving downstream %q: %v\n%v", ic.RemoteAddr(), err, debug.Stack())
201 }
202 }()
203
[605]204 atomic.AddInt64(&s.connCount, 1)
[323]205 id := atomic.AddUint64(&lastDownstreamID, 1)
[347]206 dc := newDownstreamConn(s, ic, id)
[323]207 if err := dc.runUntilRegistered(); err != nil {
[655]208 if !errors.Is(err, io.EOF) {
209 dc.logger.Print(err)
210 }
[323]211 } else {
212 dc.user.events <- eventDownstreamConnected{dc}
213 if err := dc.readMessages(dc.user.events); err != nil {
214 dc.logger.Print(err)
215 }
216 dc.user.events <- eventDownstreamDisconnected{dc}
217 }
218 dc.Close()
[605]219 atomic.AddInt64(&s.connCount, -1)
[323]220}
221
[3]222func (s *Server) Serve(ln net.Listener) error {
[449]223 s.lock.Lock()
224 s.listeners[ln] = struct{}{}
225 s.lock.Unlock()
226
227 s.stopWG.Add(1)
228
229 defer func() {
230 s.lock.Lock()
231 delete(s.listeners, ln)
232 s.lock.Unlock()
233
234 s.stopWG.Done()
235 }()
236
[1]237 for {
[323]238 conn, err := ln.Accept()
[601]239 if isErrClosed(err) {
[449]240 return nil
241 } else if err != nil {
[1]242 return fmt.Errorf("failed to accept connection: %v", err)
243 }
244
[347]245 go s.handle(newNetIRCConn(conn))
[1]246 }
247}
[323]248
249func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
250 conn, err := websocket.Accept(w, req, &websocket.AcceptOptions{
[597]251 Subprotocols: []string{"text.ircv3.net"}, // non-compliant, fight me
[691]252 OriginPatterns: s.Config().HTTPOrigins,
[323]253 })
254 if err != nil {
255 s.Logger.Printf("failed to serve HTTP connection: %v", err)
256 return
257 }
[345]258
[370]259 isProxy := false
[345]260 if host, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
261 if ip := net.ParseIP(host); ip != nil {
[691]262 isProxy = s.Config().AcceptProxyIPs.Contains(ip)
[345]263 }
264 }
265
[474]266 // Only trust the Forwarded header field if this is a trusted proxy IP
[345]267 // to prevent users from spoofing the remote address
[344]268 remoteAddr := req.RemoteAddr
[472]269 if isProxy {
270 forwarded := parseForwarded(req.Header)
[473]271 if forwarded["for"] != "" {
272 remoteAddr = forwarded["for"]
[472]273 }
[344]274 }
[345]275
[347]276 s.handle(newWebsocketIRCConn(conn, remoteAddr))
[323]277}
[472]278
279func parseForwarded(h http.Header) map[string]string {
280 forwarded := h.Get("Forwarded")
281 if forwarded == "" {
[474]282 return map[string]string{
283 "for": h.Get("X-Forwarded-For"),
284 "proto": h.Get("X-Forwarded-Proto"),
285 "host": h.Get("X-Forwarded-Host"),
286 }
[472]287 }
288 // Hack to easily parse header parameters
289 _, params, _ := mime.ParseMediaType("hack; " + forwarded)
290 return params
291}
[605]292
293type ServerStats struct {
294 Users int
295 Downstreams int64
296}
297
298func (s *Server) Stats() *ServerStats {
299 var stats ServerStats
300 s.lock.Lock()
301 stats.Users = len(s.users)
302 s.lock.Unlock()
303 stats.Downstreams = atomic.LoadInt64(&s.connCount)
304 return &stats
305}
Note: See TracBrowser for help on using the repository browser.