source: code/trunk/server.go@ 563

Last change on this file since 563 was 563, checked in by contact, 4 years ago

Allow admins to broadcast message to all bouncer users

Typically done via:

/notice $<bouncer> <message>

Or, for a connection not bound to a specific network:

/notice $* <message>

The message is broadcast as BouncerServ, because that's the only
user that can be trusted to belong to the bouncer by users. Any
other prefix would conflict with the upstream network.

File size: 4.9 KB
Line 
1package soju
2
3import (
4 "fmt"
5 "log"
6 "mime"
7 "net"
8 "net/http"
9 "strings"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "gopkg.in/irc.v3"
15 "nhooyr.io/websocket"
16
17 "git.sr.ht/~emersion/soju/config"
18)
19
20// TODO: make configurable
21var retryConnectDelay = time.Minute
22var connectTimeout = 15 * time.Second
23var writeTimeout = 10 * time.Second
24var upstreamMessageDelay = 2 * time.Second
25var upstreamMessageBurst = 10
26
27type Logger interface {
28 Print(v ...interface{})
29 Printf(format string, v ...interface{})
30}
31
32type prefixLogger struct {
33 logger Logger
34 prefix string
35}
36
37var _ Logger = (*prefixLogger)(nil)
38
39func (l *prefixLogger) Print(v ...interface{}) {
40 v = append([]interface{}{l.prefix}, v...)
41 l.logger.Print(v...)
42}
43
44func (l *prefixLogger) Printf(format string, v ...interface{}) {
45 v = append([]interface{}{l.prefix}, v...)
46 l.logger.Printf("%v"+format, v...)
47}
48
49type Server struct {
50 Hostname string
51 Logger Logger
52 HistoryLimit int
53 LogPath string
54 Debug bool
55 HTTPOrigins []string
56 AcceptProxyIPs config.IPSet
57 Identd *Identd // can be nil
58
59 db Database
60 stopWG sync.WaitGroup
61
62 lock sync.Mutex
63 listeners map[net.Listener]struct{}
64 users map[string]*user
65}
66
67func NewServer(db Database) *Server {
68 return &Server{
69 Logger: log.New(log.Writer(), "", log.LstdFlags),
70 HistoryLimit: 1000,
71 db: db,
72 listeners: make(map[net.Listener]struct{}),
73 users: make(map[string]*user),
74 }
75}
76
77func (s *Server) prefix() *irc.Prefix {
78 return &irc.Prefix{Name: s.Hostname}
79}
80
81func (s *Server) Start() error {
82 users, err := s.db.ListUsers()
83 if err != nil {
84 return err
85 }
86
87 s.lock.Lock()
88 for i := range users {
89 s.addUserLocked(&users[i])
90 }
91 s.lock.Unlock()
92
93 return nil
94}
95
96func (s *Server) Shutdown() {
97 s.lock.Lock()
98 for ln := range s.listeners {
99 if err := ln.Close(); err != nil {
100 s.Logger.Printf("failed to stop listener: %v", err)
101 }
102 }
103 for _, u := range s.users {
104 u.events <- eventStop{}
105 }
106 s.lock.Unlock()
107
108 s.stopWG.Wait()
109}
110
111func (s *Server) createUser(user *User) (*user, error) {
112 s.lock.Lock()
113 defer s.lock.Unlock()
114
115 if _, ok := s.users[user.Username]; ok {
116 return nil, fmt.Errorf("user %q already exists", user.Username)
117 }
118
119 err := s.db.StoreUser(user)
120 if err != nil {
121 return nil, fmt.Errorf("could not create user in db: %v", err)
122 }
123
124 return s.addUserLocked(user), nil
125}
126
127func (s *Server) forEachUser(f func(*user)) {
128 s.lock.Lock()
129 for _, u := range s.users {
130 f(u)
131 }
132 s.lock.Unlock()
133}
134
135func (s *Server) getUser(name string) *user {
136 s.lock.Lock()
137 u := s.users[name]
138 s.lock.Unlock()
139 return u
140}
141
142func (s *Server) addUserLocked(user *User) *user {
143 s.Logger.Printf("starting bouncer for user %q", user.Username)
144 u := newUser(s, user)
145 s.users[u.Username] = u
146
147 s.stopWG.Add(1)
148
149 go func() {
150 u.run()
151
152 s.lock.Lock()
153 delete(s.users, u.Username)
154 s.lock.Unlock()
155
156 s.stopWG.Done()
157 }()
158
159 return u
160}
161
162var lastDownstreamID uint64 = 0
163
164func (s *Server) handle(ic ircConn) {
165 id := atomic.AddUint64(&lastDownstreamID, 1)
166 dc := newDownstreamConn(s, ic, id)
167 if err := dc.runUntilRegistered(); err != nil {
168 dc.logger.Print(err)
169 } else {
170 dc.user.events <- eventDownstreamConnected{dc}
171 if err := dc.readMessages(dc.user.events); err != nil {
172 dc.logger.Print(err)
173 }
174 dc.user.events <- eventDownstreamDisconnected{dc}
175 }
176 dc.Close()
177}
178
179func (s *Server) Serve(ln net.Listener) error {
180 s.lock.Lock()
181 s.listeners[ln] = struct{}{}
182 s.lock.Unlock()
183
184 s.stopWG.Add(1)
185
186 defer func() {
187 s.lock.Lock()
188 delete(s.listeners, ln)
189 s.lock.Unlock()
190
191 s.stopWG.Done()
192 }()
193
194 for {
195 conn, err := ln.Accept()
196 // TODO: use net.ErrClosed when available
197 if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
198 return nil
199 } else if err != nil {
200 return fmt.Errorf("failed to accept connection: %v", err)
201 }
202
203 go s.handle(newNetIRCConn(conn))
204 }
205}
206
207func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
208 conn, err := websocket.Accept(w, req, &websocket.AcceptOptions{
209 OriginPatterns: s.HTTPOrigins,
210 })
211 if err != nil {
212 s.Logger.Printf("failed to serve HTTP connection: %v", err)
213 return
214 }
215
216 isProxy := false
217 if host, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
218 if ip := net.ParseIP(host); ip != nil {
219 isProxy = s.AcceptProxyIPs.Contains(ip)
220 }
221 }
222
223 // Only trust the Forwarded header field if this is a trusted proxy IP
224 // to prevent users from spoofing the remote address
225 remoteAddr := req.RemoteAddr
226 if isProxy {
227 forwarded := parseForwarded(req.Header)
228 if forwarded["for"] != "" {
229 remoteAddr = forwarded["for"]
230 }
231 }
232
233 s.handle(newWebsocketIRCConn(conn, remoteAddr))
234}
235
236func parseForwarded(h http.Header) map[string]string {
237 forwarded := h.Get("Forwarded")
238 if forwarded == "" {
239 return map[string]string{
240 "for": h.Get("X-Forwarded-For"),
241 "proto": h.Get("X-Forwarded-Proto"),
242 "host": h.Get("X-Forwarded-Host"),
243 }
244 }
245 // Hack to easily parse header parameters
246 _, params, _ := mime.ParseMediaType("hack; " + forwarded)
247 return params
248}
Note: See TracBrowser for help on using the repository browser.