source: code/trunk/server.go@ 471

Last change on this file since 471 was 471, checked in by contact, 4 years ago

Drop "irc" WebSocket subprotocol

The subprotocol hasn't been standardized yet. It looks like the standard
is moving in another direction.

File size: 4.5 KB
RevLine 
[98]1package soju
[1]2
3import (
4 "fmt"
[37]5 "log"
[1]6 "net"
[323]7 "net/http"
[449]8 "strings"
[24]9 "sync"
[323]10 "sync/atomic"
[67]11 "time"
[1]12
13 "gopkg.in/irc.v3"
[323]14 "nhooyr.io/websocket"
[370]15
16 "git.sr.ht/~emersion/soju/config"
[1]17)
18
[67]19// TODO: make configurable
[398]20var retryConnectDelay = time.Minute
[206]21var connectTimeout = 15 * time.Second
[205]22var writeTimeout = 10 * time.Second
[398]23var upstreamMessageDelay = 2 * time.Second
24var upstreamMessageBurst = 10
[67]25
[9]26type Logger interface {
27 Print(v ...interface{})
28 Printf(format string, v ...interface{})
29}
30
[21]31type prefixLogger struct {
32 logger Logger
33 prefix string
34}
35
36var _ Logger = (*prefixLogger)(nil)
37
38func (l *prefixLogger) Print(v ...interface{}) {
39 v = append([]interface{}{l.prefix}, v...)
40 l.logger.Print(v...)
41}
42
43func (l *prefixLogger) Printf(format string, v ...interface{}) {
44 v = append([]interface{}{l.prefix}, v...)
45 l.logger.Printf("%v"+format, v...)
46}
47
[10]48type Server struct {
[370]49 Hostname string
50 Logger Logger
51 HistoryLimit int
52 LogPath string
53 Debug bool
54 HTTPOrigins []string
55 AcceptProxyIPs config.IPSet
[385]56 Identd *Identd // can be nil
[22]57
[449]58 db *DB
59 stopWG sync.WaitGroup
[77]60
[449]61 lock sync.Mutex
62 listeners map[net.Listener]struct{}
63 users map[string]*user
[10]64}
65
[77]66func NewServer(db *DB) *Server {
[37]67 return &Server{
[319]68 Logger: log.New(log.Writer(), "", log.LstdFlags),
69 HistoryLimit: 1000,
[449]70 db: db,
71 listeners: make(map[net.Listener]struct{}),
[319]72 users: make(map[string]*user),
[37]73 }
74}
75
[5]76func (s *Server) prefix() *irc.Prefix {
77 return &irc.Prefix{Name: s.Hostname}
78}
79
[449]80func (s *Server) Start() error {
[77]81 users, err := s.db.ListUsers()
82 if err != nil {
83 return err
84 }
[71]85
[77]86 s.lock.Lock()
[378]87 for i := range users {
88 s.addUserLocked(&users[i])
[71]89 }
[37]90 s.lock.Unlock()
91
[449]92 return nil
[10]93}
94
[449]95func (s *Server) Shutdown() {
96 s.lock.Lock()
97 for ln := range s.listeners {
98 if err := ln.Close(); err != nil {
99 s.Logger.Printf("failed to stop listener: %v", err)
100 }
101 }
102 for _, u := range s.users {
103 u.events <- eventStop{}
104 }
105 s.lock.Unlock()
106
107 s.stopWG.Wait()
108}
109
[329]110func (s *Server) createUser(user *User) (*user, error) {
111 s.lock.Lock()
112 defer s.lock.Unlock()
113
114 if _, ok := s.users[user.Username]; ok {
115 return nil, fmt.Errorf("user %q already exists", user.Username)
116 }
117
118 err := s.db.StoreUser(user)
119 if err != nil {
120 return nil, fmt.Errorf("could not create user in db: %v", err)
121 }
122
[378]123 return s.addUserLocked(user), nil
[329]124}
125
[38]126func (s *Server) getUser(name string) *user {
127 s.lock.Lock()
128 u := s.users[name]
129 s.lock.Unlock()
130 return u
131}
132
[378]133func (s *Server) addUserLocked(user *User) *user {
134 s.Logger.Printf("starting bouncer for user %q", user.Username)
135 u := newUser(s, user)
136 s.users[u.Username] = u
137
[449]138 s.stopWG.Add(1)
139
[378]140 go func() {
141 u.run()
142
143 s.lock.Lock()
144 delete(s.users, u.Username)
145 s.lock.Unlock()
[449]146
147 s.stopWG.Done()
[378]148 }()
149
150 return u
151}
152
[323]153var lastDownstreamID uint64 = 0
154
[347]155func (s *Server) handle(ic ircConn) {
[323]156 id := atomic.AddUint64(&lastDownstreamID, 1)
[347]157 dc := newDownstreamConn(s, ic, id)
[323]158 if err := dc.runUntilRegistered(); err != nil {
159 dc.logger.Print(err)
160 } else {
161 dc.user.events <- eventDownstreamConnected{dc}
162 if err := dc.readMessages(dc.user.events); err != nil {
163 dc.logger.Print(err)
164 }
165 dc.user.events <- eventDownstreamDisconnected{dc}
166 }
167 dc.Close()
168}
169
[3]170func (s *Server) Serve(ln net.Listener) error {
[449]171 s.lock.Lock()
172 s.listeners[ln] = struct{}{}
173 s.lock.Unlock()
174
175 s.stopWG.Add(1)
176
177 defer func() {
178 s.lock.Lock()
179 delete(s.listeners, ln)
180 s.lock.Unlock()
181
182 s.stopWG.Done()
183 }()
184
[1]185 for {
[323]186 conn, err := ln.Accept()
[449]187 // TODO: use net.ErrClosed when available
188 if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
189 return nil
190 } else if err != nil {
[1]191 return fmt.Errorf("failed to accept connection: %v", err)
192 }
193
[347]194 go s.handle(newNetIRCConn(conn))
[1]195 }
196}
[323]197
198func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
199 conn, err := websocket.Accept(w, req, &websocket.AcceptOptions{
200 OriginPatterns: s.HTTPOrigins,
201 })
202 if err != nil {
203 s.Logger.Printf("failed to serve HTTP connection: %v", err)
204 return
205 }
[345]206
[370]207 isProxy := false
[345]208 if host, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
209 if ip := net.ParseIP(host); ip != nil {
[370]210 isProxy = s.AcceptProxyIPs.Contains(ip)
[345]211 }
212 }
213
[370]214 // Only trust X-Forwarded-* header fields if this is a trusted proxy IP
[345]215 // to prevent users from spoofing the remote address
[344]216 remoteAddr := req.RemoteAddr
217 forwardedHost := req.Header.Get("X-Forwarded-For")
218 forwardedPort := req.Header.Get("X-Forwarded-Port")
[370]219 if isProxy && forwardedHost != "" && forwardedPort != "" {
[344]220 remoteAddr = net.JoinHostPort(forwardedHost, forwardedPort)
221 }
[345]222
[347]223 s.handle(newWebsocketIRCConn(conn, remoteAddr))
[323]224}
Note: See TracBrowser for help on using the repository browser.