source: code/trunk/server.go@ 447

Last change on this file since 447 was 409, checked in by contact, 5 years ago

Nuke in-memory ring buffer

Instead, always read chat history from logs. Unify the implicit chat
history (pushing history to clients) and explicit chat history
(via the CHATHISTORY command).

Instead of keeping track of ring buffer cursors for each client, use
message IDs.

If necessary, the ring buffer could be re-introduced behind a
common MessageStore interface (could be useful when on-disk logs are
disabled).

References: https://todo.sr.ht/~emersion/soju/80

File size: 3.8 KB
RevLine 
[98]1package soju
[1]2
3import (
4 "fmt"
[37]5 "log"
[1]6 "net"
[323]7 "net/http"
[24]8 "sync"
[323]9 "sync/atomic"
[67]10 "time"
[1]11
12 "gopkg.in/irc.v3"
[323]13 "nhooyr.io/websocket"
[370]14
15 "git.sr.ht/~emersion/soju/config"
[1]16)
17
[67]18// TODO: make configurable
[398]19var retryConnectDelay = time.Minute
[206]20var connectTimeout = 15 * time.Second
[205]21var writeTimeout = 10 * time.Second
[398]22var upstreamMessageDelay = 2 * time.Second
23var upstreamMessageBurst = 10
[67]24
[9]25type Logger interface {
26 Print(v ...interface{})
27 Printf(format string, v ...interface{})
28}
29
[21]30type prefixLogger struct {
31 logger Logger
32 prefix string
33}
34
35var _ Logger = (*prefixLogger)(nil)
36
37func (l *prefixLogger) Print(v ...interface{}) {
38 v = append([]interface{}{l.prefix}, v...)
39 l.logger.Print(v...)
40}
41
42func (l *prefixLogger) Printf(format string, v ...interface{}) {
43 v = append([]interface{}{l.prefix}, v...)
44 l.logger.Printf("%v"+format, v...)
45}
46
[10]47type Server struct {
[370]48 Hostname string
49 Logger Logger
50 HistoryLimit int
51 LogPath string
52 Debug bool
53 HTTPOrigins []string
54 AcceptProxyIPs config.IPSet
[385]55 Identd *Identd // can be nil
[22]56
[77]57 db *DB
58
[172]59 lock sync.Mutex
60 users map[string]*user
[10]61}
62
[77]63func NewServer(db *DB) *Server {
[37]64 return &Server{
[319]65 Logger: log.New(log.Writer(), "", log.LstdFlags),
66 HistoryLimit: 1000,
67 users: make(map[string]*user),
68 db: db,
[37]69 }
70}
71
[5]72func (s *Server) prefix() *irc.Prefix {
73 return &irc.Prefix{Name: s.Hostname}
74}
75
[77]76func (s *Server) Run() error {
77 users, err := s.db.ListUsers()
78 if err != nil {
79 return err
80 }
[71]81
[77]82 s.lock.Lock()
[378]83 for i := range users {
84 s.addUserLocked(&users[i])
[71]85 }
[37]86 s.lock.Unlock()
87
[77]88 select {}
[10]89}
90
[329]91func (s *Server) createUser(user *User) (*user, error) {
92 s.lock.Lock()
93 defer s.lock.Unlock()
94
95 if _, ok := s.users[user.Username]; ok {
96 return nil, fmt.Errorf("user %q already exists", user.Username)
97 }
98
99 err := s.db.StoreUser(user)
100 if err != nil {
101 return nil, fmt.Errorf("could not create user in db: %v", err)
102 }
103
[378]104 return s.addUserLocked(user), nil
[329]105}
106
[38]107func (s *Server) getUser(name string) *user {
108 s.lock.Lock()
109 u := s.users[name]
110 s.lock.Unlock()
111 return u
112}
113
[378]114func (s *Server) addUserLocked(user *User) *user {
115 s.Logger.Printf("starting bouncer for user %q", user.Username)
116 u := newUser(s, user)
117 s.users[u.Username] = u
118
119 go func() {
120 u.run()
121
122 s.lock.Lock()
123 delete(s.users, u.Username)
124 s.lock.Unlock()
125 }()
126
127 return u
128}
129
[323]130var lastDownstreamID uint64 = 0
131
[347]132func (s *Server) handle(ic ircConn) {
[323]133 id := atomic.AddUint64(&lastDownstreamID, 1)
[347]134 dc := newDownstreamConn(s, ic, id)
[323]135 if err := dc.runUntilRegistered(); err != nil {
136 dc.logger.Print(err)
137 } else {
138 dc.user.events <- eventDownstreamConnected{dc}
139 if err := dc.readMessages(dc.user.events); err != nil {
140 dc.logger.Print(err)
141 }
142 dc.user.events <- eventDownstreamDisconnected{dc}
143 }
144 dc.Close()
145}
146
[3]147func (s *Server) Serve(ln net.Listener) error {
[1]148 for {
[323]149 conn, err := ln.Accept()
[1]150 if err != nil {
151 return fmt.Errorf("failed to accept connection: %v", err)
152 }
153
[347]154 go s.handle(newNetIRCConn(conn))
[1]155 }
156}
[323]157
158func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
159 conn, err := websocket.Accept(w, req, &websocket.AcceptOptions{
160 OriginPatterns: s.HTTPOrigins,
[348]161 Subprotocols: []string{"irc"},
[323]162 })
163 if err != nil {
164 s.Logger.Printf("failed to serve HTTP connection: %v", err)
165 return
166 }
[345]167
[370]168 isProxy := false
[345]169 if host, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
170 if ip := net.ParseIP(host); ip != nil {
[370]171 isProxy = s.AcceptProxyIPs.Contains(ip)
[345]172 }
173 }
174
[370]175 // Only trust X-Forwarded-* header fields if this is a trusted proxy IP
[345]176 // to prevent users from spoofing the remote address
[344]177 remoteAddr := req.RemoteAddr
178 forwardedHost := req.Header.Get("X-Forwarded-For")
179 forwardedPort := req.Header.Get("X-Forwarded-Port")
[370]180 if isProxy && forwardedHost != "" && forwardedPort != "" {
[344]181 remoteAddr = net.JoinHostPort(forwardedHost, forwardedPort)
182 }
[345]183
[347]184 s.handle(newWebsocketIRCConn(conn, remoteAddr))
[323]185}
Note: See TracBrowser for help on using the repository browser.