source: code/trunk/server.go@ 430

Last change on this file since 430 was 409, checked in by contact, 5 years ago

Nuke in-memory ring buffer

Instead, always read chat history from logs. Unify the implicit chat
history (pushing history to clients) and explicit chat history
(via the CHATHISTORY command).

Instead of keeping track of ring buffer cursors for each client, use
message IDs.

If necessary, the ring buffer could be re-introduced behind a
common MessageStore interface (could be useful when on-disk logs are
disabled).

References: https://todo.sr.ht/~emersion/soju/80

File size: 3.8 KB
Line 
1package soju
2
3import (
4 "fmt"
5 "log"
6 "net"
7 "net/http"
8 "sync"
9 "sync/atomic"
10 "time"
11
12 "gopkg.in/irc.v3"
13 "nhooyr.io/websocket"
14
15 "git.sr.ht/~emersion/soju/config"
16)
17
18// TODO: make configurable
19var retryConnectDelay = time.Minute
20var connectTimeout = 15 * time.Second
21var writeTimeout = 10 * time.Second
22var upstreamMessageDelay = 2 * time.Second
23var upstreamMessageBurst = 10
24
25type Logger interface {
26 Print(v ...interface{})
27 Printf(format string, v ...interface{})
28}
29
30type prefixLogger struct {
31 logger Logger
32 prefix string
33}
34
35var _ Logger = (*prefixLogger)(nil)
36
37func (l *prefixLogger) Print(v ...interface{}) {
38 v = append([]interface{}{l.prefix}, v...)
39 l.logger.Print(v...)
40}
41
42func (l *prefixLogger) Printf(format string, v ...interface{}) {
43 v = append([]interface{}{l.prefix}, v...)
44 l.logger.Printf("%v"+format, v...)
45}
46
47type Server struct {
48 Hostname string
49 Logger Logger
50 HistoryLimit int
51 LogPath string
52 Debug bool
53 HTTPOrigins []string
54 AcceptProxyIPs config.IPSet
55 Identd *Identd // can be nil
56
57 db *DB
58
59 lock sync.Mutex
60 users map[string]*user
61}
62
63func NewServer(db *DB) *Server {
64 return &Server{
65 Logger: log.New(log.Writer(), "", log.LstdFlags),
66 HistoryLimit: 1000,
67 users: make(map[string]*user),
68 db: db,
69 }
70}
71
72func (s *Server) prefix() *irc.Prefix {
73 return &irc.Prefix{Name: s.Hostname}
74}
75
76func (s *Server) Run() error {
77 users, err := s.db.ListUsers()
78 if err != nil {
79 return err
80 }
81
82 s.lock.Lock()
83 for i := range users {
84 s.addUserLocked(&users[i])
85 }
86 s.lock.Unlock()
87
88 select {}
89}
90
91func (s *Server) createUser(user *User) (*user, error) {
92 s.lock.Lock()
93 defer s.lock.Unlock()
94
95 if _, ok := s.users[user.Username]; ok {
96 return nil, fmt.Errorf("user %q already exists", user.Username)
97 }
98
99 err := s.db.StoreUser(user)
100 if err != nil {
101 return nil, fmt.Errorf("could not create user in db: %v", err)
102 }
103
104 return s.addUserLocked(user), nil
105}
106
107func (s *Server) getUser(name string) *user {
108 s.lock.Lock()
109 u := s.users[name]
110 s.lock.Unlock()
111 return u
112}
113
114func (s *Server) addUserLocked(user *User) *user {
115 s.Logger.Printf("starting bouncer for user %q", user.Username)
116 u := newUser(s, user)
117 s.users[u.Username] = u
118
119 go func() {
120 u.run()
121
122 s.lock.Lock()
123 delete(s.users, u.Username)
124 s.lock.Unlock()
125 }()
126
127 return u
128}
129
130var lastDownstreamID uint64 = 0
131
132func (s *Server) handle(ic ircConn) {
133 id := atomic.AddUint64(&lastDownstreamID, 1)
134 dc := newDownstreamConn(s, ic, id)
135 if err := dc.runUntilRegistered(); err != nil {
136 dc.logger.Print(err)
137 } else {
138 dc.user.events <- eventDownstreamConnected{dc}
139 if err := dc.readMessages(dc.user.events); err != nil {
140 dc.logger.Print(err)
141 }
142 dc.user.events <- eventDownstreamDisconnected{dc}
143 }
144 dc.Close()
145}
146
147func (s *Server) Serve(ln net.Listener) error {
148 for {
149 conn, err := ln.Accept()
150 if err != nil {
151 return fmt.Errorf("failed to accept connection: %v", err)
152 }
153
154 go s.handle(newNetIRCConn(conn))
155 }
156}
157
158func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
159 conn, err := websocket.Accept(w, req, &websocket.AcceptOptions{
160 OriginPatterns: s.HTTPOrigins,
161 Subprotocols: []string{"irc"},
162 })
163 if err != nil {
164 s.Logger.Printf("failed to serve HTTP connection: %v", err)
165 return
166 }
167
168 isProxy := false
169 if host, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
170 if ip := net.ParseIP(host); ip != nil {
171 isProxy = s.AcceptProxyIPs.Contains(ip)
172 }
173 }
174
175 // Only trust X-Forwarded-* header fields if this is a trusted proxy IP
176 // to prevent users from spoofing the remote address
177 remoteAddr := req.RemoteAddr
178 forwardedHost := req.Header.Get("X-Forwarded-For")
179 forwardedPort := req.Header.Get("X-Forwarded-Port")
180 if isProxy && forwardedHost != "" && forwardedPort != "" {
181 remoteAddr = net.JoinHostPort(forwardedHost, forwardedPort)
182 }
183
184 s.handle(newWebsocketIRCConn(conn, remoteAddr))
185}
Note: See TracBrowser for help on using the repository browser.