source: code/trunk/user.go@ 264

Last change on this file since 264 was 253, checked in by contact, 5 years ago

Per-entity ring buffers

Instead of having one ring buffer per network, each network has one ring
buffer per entity (channel or nick). This allows history to be more
fair: if there's a lot of activity in a channel, it won't prune activity
in other channels.

We now track history sequence numbers per client and per network in
networkHistory. The overall list of offline clients is still tracked in
network.offlineClients.

When all clients have received history, the ring buffer can be released.

In the future, we should get rid of too-old offline clients to avoid
having to maintain history for them forever. We should also add a
per-user limit on the number of ring buffers.

File size: 8.8 KB
RevLine 
[101]1package soju
2
3import (
[218]4 "fmt"
[101]5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[218]17type eventUpstreamConnectionError struct {
18 net *network
19 err error
20}
21
[196]22type eventUpstreamConnected struct {
23 uc *upstreamConn
24}
25
[179]26type eventUpstreamDisconnected struct {
27 uc *upstreamConn
28}
29
[218]30type eventUpstreamError struct {
31 uc *upstreamConn
32 err error
33}
34
[165]35type eventDownstreamMessage struct {
[103]36 msg *irc.Message
37 dc *downstreamConn
38}
39
[166]40type eventDownstreamConnected struct {
41 dc *downstreamConn
42}
43
[167]44type eventDownstreamDisconnected struct {
45 dc *downstreamConn
46}
47
[253]48type networkHistory struct {
49 offlineClients map[string]uint64 // indexed by client name
50 ring *Ring // can be nil if there are no offline clients
51}
52
[101]53type network struct {
54 Network
[202]55 user *user
56 stopped chan struct{}
[131]57
[253]58 conn *upstreamConn
59 history map[string]*networkHistory // indexed by entity
60 offlineClients map[string]struct{} // indexed by client name
61 lastError error
[101]62}
63
64func newNetwork(user *user, record *Network) *network {
65 return &network{
[253]66 Network: *record,
67 user: user,
68 stopped: make(chan struct{}),
69 history: make(map[string]*networkHistory),
70 offlineClients: make(map[string]struct{}),
[101]71 }
72}
73
[218]74func (net *network) forEachDownstream(f func(*downstreamConn)) {
75 net.user.forEachDownstream(func(dc *downstreamConn) {
76 if dc.network != nil && dc.network != net {
77 return
78 }
79 f(dc)
80 })
81}
82
[101]83func (net *network) run() {
84 var lastTry time.Time
85 for {
[202]86 select {
87 case <-net.stopped:
88 return
89 default:
90 // This space is intentionally left blank
91 }
92
[101]93 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
94 delay := retryConnectMinDelay - dur
95 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
96 time.Sleep(delay)
97 }
98 lastTry = time.Now()
99
100 uc, err := connectToUpstream(net)
101 if err != nil {
102 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
[218]103 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
[101]104 continue
105 }
106
107 uc.register()
[197]108 if err := uc.runUntilRegistered(); err != nil {
109 uc.logger.Printf("failed to register: %v", err)
[218]110 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", err)}
[197]111 uc.Close()
112 continue
113 }
[101]114
[196]115 net.user.events <- eventUpstreamConnected{uc}
[165]116 if err := uc.readMessages(net.user.events); err != nil {
[101]117 uc.logger.Printf("failed to handle messages: %v", err)
[218]118 net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
[101]119 }
120 uc.Close()
[179]121 net.user.events <- eventUpstreamDisconnected{uc}
[101]122 }
123}
124
[136]125func (net *network) upstream() *upstreamConn {
126 return net.conn
127}
128
[202]129func (net *network) Stop() {
130 select {
131 case <-net.stopped:
132 return
133 default:
134 close(net.stopped)
135 }
136
137 if uc := net.upstream(); uc != nil {
138 uc.Close()
139 }
140}
141
[222]142func (net *network) createUpdateChannel(ch *Channel) error {
143 if dbCh, err := net.user.srv.db.GetChannel(net.ID, ch.Name); err == nil {
144 ch.ID = dbCh.ID
145 } else if err != ErrNoSuchChannel {
146 return err
147 }
148 return net.user.srv.db.StoreChannel(net.ID, ch)
149}
150
151func (net *network) deleteChannel(name string) error {
152 return net.user.srv.db.DeleteChannel(net.ID, name)
153}
154
[101]155type user struct {
156 User
157 srv *Server
158
[165]159 events chan event
[103]160
[101]161 networks []*network
162 downstreamConns []*downstreamConn
[177]163
164 // LIST commands in progress
[179]165 pendingLISTs []pendingLIST
[101]166}
167
[177]168type pendingLIST struct {
169 downstreamID uint64
170 // list of per-upstream LIST commands not yet sent or completed
171 pendingCommands map[int64]*irc.Message
172}
173
[101]174func newUser(srv *Server, record *User) *user {
175 return &user{
[165]176 User: *record,
177 srv: srv,
178 events: make(chan event, 64),
[101]179 }
180}
181
182func (u *user) forEachNetwork(f func(*network)) {
183 for _, network := range u.networks {
184 f(network)
185 }
186}
187
188func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
189 for _, network := range u.networks {
[136]190 uc := network.upstream()
[197]191 if uc == nil {
[101]192 continue
193 }
194 f(uc)
195 }
196}
197
198func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
199 for _, dc := range u.downstreamConns {
200 f(dc)
201 }
202}
203
204func (u *user) getNetwork(name string) *network {
205 for _, network := range u.networks {
206 if network.Addr == name {
207 return network
208 }
[201]209 if network.Name != "" && network.Name == name {
210 return network
211 }
[101]212 }
213 return nil
214}
215
216func (u *user) run() {
217 networks, err := u.srv.db.ListNetworks(u.Username)
218 if err != nil {
219 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
220 return
221 }
222
223 for _, record := range networks {
224 network := newNetwork(u, &record)
225 u.networks = append(u.networks, network)
226
227 go network.run()
228 }
[103]229
[165]230 for e := range u.events {
231 switch e := e.(type) {
[196]232 case eventUpstreamConnected:
[198]233 uc := e.uc
[199]234
235 uc.network.conn = uc
236
[198]237 uc.updateAway()
[218]238
239 uc.forEachDownstream(func(dc *downstreamConn) {
[223]240 sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.GetName()))
[218]241 })
242 uc.network.lastError = nil
[179]243 case eventUpstreamDisconnected:
244 uc := e.uc
[199]245
246 uc.network.conn = nil
247
[215]248 for _, ml := range uc.messageLoggers {
249 if err := ml.Close(); err != nil {
250 uc.logger.Printf("failed to close message logger: %v", err)
251 }
[179]252 }
[199]253
[181]254 uc.endPendingLISTs(true)
[218]255
256 if uc.network.lastError == nil {
257 uc.forEachDownstream(func(dc *downstreamConn) {
[223]258 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.GetName()))
[218]259 })
260 }
261 case eventUpstreamConnectionError:
262 net := e.net
263
264 if net.lastError == nil || net.lastError.Error() != e.err.Error() {
265 net.forEachDownstream(func(dc *downstreamConn) {
[223]266 sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.GetName(), e.err))
[218]267 })
268 }
269 net.lastError = e.err
270 case eventUpstreamError:
271 uc := e.uc
272
273 uc.forEachDownstream(func(dc *downstreamConn) {
[223]274 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.GetName(), e.err))
[218]275 })
276 uc.network.lastError = e.err
[165]277 case eventUpstreamMessage:
278 msg, uc := e.msg, e.uc
[175]279 if uc.isClosed() {
[133]280 uc.logger.Printf("ignoring message on closed connection: %v", msg)
281 break
282 }
[103]283 if err := uc.handleMessage(msg); err != nil {
284 uc.logger.Printf("failed to handle message %q: %v", msg, err)
285 }
[166]286 case eventDownstreamConnected:
287 dc := e.dc
[168]288
289 if err := dc.welcome(); err != nil {
290 dc.logger.Printf("failed to handle new registered connection: %v", err)
291 break
292 }
293
[166]294 u.downstreamConns = append(u.downstreamConns, dc)
[198]295
296 u.forEachUpstream(func(uc *upstreamConn) {
297 uc.updateAway()
298 })
[167]299 case eventDownstreamDisconnected:
300 dc := e.dc
[204]301
[167]302 for i := range u.downstreamConns {
303 if u.downstreamConns[i] == dc {
304 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
305 break
306 }
307 }
[198]308
[253]309 // Save history if we're the last client with this name
310 skipHistory := make(map[*network]bool)
311 u.forEachDownstream(func(conn *downstreamConn) {
312 if dc.clientName == conn.clientName {
313 skipHistory[conn.network] = true
314 }
315 })
316
317 dc.forEachNetwork(func(net *network) {
318 if skipHistory[net] || skipHistory[nil] {
319 return
320 }
321
322 net.offlineClients[dc.clientName] = struct{}{}
323 for _, history := range net.history {
324 history.offlineClients[dc.clientName] = history.ring.Cur()
325 }
326 })
327
[198]328 u.forEachUpstream(func(uc *upstreamConn) {
329 uc.updateAway()
330 })
[165]331 case eventDownstreamMessage:
332 msg, dc := e.msg, e.dc
[133]333 if dc.isClosed() {
334 dc.logger.Printf("ignoring message on closed connection: %v", msg)
335 break
336 }
[103]337 err := dc.handleMessage(msg)
338 if ircErr, ok := err.(ircError); ok {
339 ircErr.Message.Prefix = dc.srv.prefix()
340 dc.SendMessage(ircErr.Message)
341 } else if err != nil {
342 dc.logger.Printf("failed to handle message %q: %v", msg, err)
343 dc.Close()
344 }
[165]345 default:
346 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]347 }
348 }
[101]349}
350
[120]351func (u *user) createNetwork(net *Network) (*network, error) {
[144]352 if net.ID != 0 {
353 panic("tried creating an already-existing network")
354 }
355
[120]356 network := newNetwork(u, net)
[101]357 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
358 if err != nil {
359 return nil, err
360 }
[144]361
[101]362 u.networks = append(u.networks, network)
[144]363
[101]364 go network.run()
365 return network, nil
366}
[202]367
368func (u *user) deleteNetwork(id int64) error {
369 for i, net := range u.networks {
370 if net.ID != id {
371 continue
372 }
373
374 if err := u.srv.db.DeleteNetwork(net.ID); err != nil {
375 return err
376 }
377
378 u.forEachDownstream(func(dc *downstreamConn) {
379 if dc.network != nil && dc.network == net {
380 dc.Close()
381 }
382 })
383
384 net.Stop()
385 u.networks = append(u.networks[:i], u.networks[i+1:]...)
386 return nil
387 }
388
389 panic("tried deleting a non-existing network")
390}
[252]391
392func (u *user) updatePassword(hashed string) error {
393 u.User.Password = hashed
394 return u.srv.db.UpdatePassword(&u.User)
395}
Note: See TracBrowser for help on using the repository browser.