source: code/trunk/user.go@ 478

Last change on this file since 478 was 478, checked in by hubert, 4 years ago

Implement casemapping

TL;DR: supports for casemapping, now logs are saved in
casemapped/canonical/tolower form
(eg. in the #channel directory instead of #Channel... or something)

What is casemapping?

see <https://modern.ircdocs.horse/#casemapping-parameter>

Casemapping and multi-upstream

Since each upstream does not necessarily use the same casemapping, and
since casemappings cannot coexist [0],

  1. soju must also update the database accordingly to upstreams' casemapping, otherwise it will end up inconsistent,
  2. soju must "normalize" entity names and expose only one casemapping that is a subset of all supported casemappings (here, ascii).

[0] On some upstreams, "emersion[m]" and "emersion{m}" refer to the same
user (upstreams that advertise rfc1459 for example), while on others
(upstreams that advertise ascii) they don't.

Once upstream's casemapping is known (default to rfc1459), entity names
in map keys are made into casemapped form, for upstreamConn,
upstreamChannel and network.

downstreamConn advertises "CASEMAPPING=ascii", and always casemap map
keys with ascii.

Some functions require the caller to casemap their argument (to avoid
needless calls to casemapping functions).

Message forwarding and casemapping

downstream message handling (joins and parts basically):
When relaying entity names from downstreams to upstreams, soju uses the
upstream casemapping, in order to not get in the way of the user. This
does not brings any issue, as long as soju replies with the ascii
casemapping in mind (solves point 1.).

marshalEntity/marshalUserPrefix:
When relaying entity names from upstreams with non-ascii casemappings,
soju *partially* casemap them: it only change the case of characters
which are not ascii letters. ASCII case is thus kept intact, while
special symbols like []{} are the same every time soju sends them to
downstreams (solves point 2.).

Casemapping changes

Casemapping changes are not fully supported by this patch and will
result in loss of history. This is a limitation of the protocol and
should be solved by the RENAME spec.

File size: 14.7 KB
Line 
1package soju
2
3import (
4 "crypto/sha256"
5 "encoding/binary"
6 "encoding/hex"
7 "fmt"
8 "time"
9
10 "gopkg.in/irc.v3"
11)
12
13type event interface{}
14
15type eventUpstreamMessage struct {
16 msg *irc.Message
17 uc *upstreamConn
18}
19
20type eventUpstreamConnectionError struct {
21 net *network
22 err error
23}
24
25type eventUpstreamConnected struct {
26 uc *upstreamConn
27}
28
29type eventUpstreamDisconnected struct {
30 uc *upstreamConn
31}
32
33type eventUpstreamError struct {
34 uc *upstreamConn
35 err error
36}
37
38type eventDownstreamMessage struct {
39 msg *irc.Message
40 dc *downstreamConn
41}
42
43type eventDownstreamConnected struct {
44 dc *downstreamConn
45}
46
47type eventDownstreamDisconnected struct {
48 dc *downstreamConn
49}
50
51type eventChannelDetach struct {
52 uc *upstreamConn
53 name string
54}
55
56type eventStop struct{}
57
58type network struct {
59 Network
60 user *user
61 stopped chan struct{}
62
63 conn *upstreamConn
64 channels channelCasemapMap
65 delivered mapStringStringCasemapMap // entity -> client name -> msg ID
66 offlineClients map[string]struct{} // indexed by client name
67 lastError error
68 casemap casemapping
69}
70
71func newNetwork(user *user, record *Network, channels []Channel) *network {
72 m := channelCasemapMap{newCasemapMap(0)}
73 for _, ch := range channels {
74 ch := ch
75 m.SetValue(ch.Name, &ch)
76 }
77
78 return &network{
79 Network: *record,
80 user: user,
81 stopped: make(chan struct{}),
82 channels: m,
83 delivered: mapStringStringCasemapMap{newCasemapMap(0)},
84 offlineClients: make(map[string]struct{}),
85 casemap: casemapRFC1459,
86 }
87}
88
89func (net *network) forEachDownstream(f func(*downstreamConn)) {
90 net.user.forEachDownstream(func(dc *downstreamConn) {
91 if dc.network != nil && dc.network != net {
92 return
93 }
94 f(dc)
95 })
96}
97
98func (net *network) isStopped() bool {
99 select {
100 case <-net.stopped:
101 return true
102 default:
103 return false
104 }
105}
106
107func userIdent(u *User) string {
108 // The ident is a string we will send to upstream servers in clear-text.
109 // For privacy reasons, make sure it doesn't expose any meaningful user
110 // metadata. We just use the base64-encoded hashed ID, so that people don't
111 // start relying on the string being an integer or following a pattern.
112 var b [64]byte
113 binary.LittleEndian.PutUint64(b[:], uint64(u.ID))
114 h := sha256.Sum256(b[:])
115 return hex.EncodeToString(h[:16])
116}
117
118func (net *network) run() {
119 var lastTry time.Time
120 for {
121 if net.isStopped() {
122 return
123 }
124
125 if dur := time.Now().Sub(lastTry); dur < retryConnectDelay {
126 delay := retryConnectDelay - dur
127 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
128 time.Sleep(delay)
129 }
130 lastTry = time.Now()
131
132 uc, err := connectToUpstream(net)
133 if err != nil {
134 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
135 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
136 continue
137 }
138
139 if net.user.srv.Identd != nil {
140 net.user.srv.Identd.Store(uc.RemoteAddr().String(), uc.LocalAddr().String(), userIdent(&net.user.User))
141 }
142
143 uc.register()
144 if err := uc.runUntilRegistered(); err != nil {
145 text := err.Error()
146 if regErr, ok := err.(registrationError); ok {
147 text = string(regErr)
148 }
149 uc.logger.Printf("failed to register: %v", text)
150 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", text)}
151 uc.Close()
152 continue
153 }
154
155 // TODO: this is racy with net.stopped. If the network is stopped
156 // before the user goroutine receives eventUpstreamConnected, the
157 // connection won't be closed.
158 net.user.events <- eventUpstreamConnected{uc}
159 if err := uc.readMessages(net.user.events); err != nil {
160 uc.logger.Printf("failed to handle messages: %v", err)
161 net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
162 }
163 uc.Close()
164 net.user.events <- eventUpstreamDisconnected{uc}
165
166 if net.user.srv.Identd != nil {
167 net.user.srv.Identd.Delete(uc.RemoteAddr().String(), uc.LocalAddr().String())
168 }
169 }
170}
171
172func (net *network) stop() {
173 if !net.isStopped() {
174 close(net.stopped)
175 }
176
177 if net.conn != nil {
178 net.conn.Close()
179 }
180}
181
182func (net *network) detach(ch *Channel) {
183 if ch.Detached {
184 return
185 }
186 ch.Detached = true
187 net.user.srv.Logger.Printf("network %q: detaching channel %q", net.GetName(), ch.Name)
188
189 if net.conn != nil {
190 uch := net.conn.channels.Value(ch.Name)
191 if uch != nil {
192 uch.updateAutoDetach(0)
193 }
194 }
195
196 net.forEachDownstream(func(dc *downstreamConn) {
197 net.offlineClients[dc.clientName] = struct{}{}
198
199 dc.SendMessage(&irc.Message{
200 Prefix: dc.prefix(),
201 Command: "PART",
202 Params: []string{dc.marshalEntity(net, ch.Name), "Detach"},
203 })
204 })
205}
206
207func (net *network) attach(ch *Channel) {
208 if !ch.Detached {
209 return
210 }
211 ch.Detached = false
212 net.user.srv.Logger.Printf("network %q: attaching channel %q", net.GetName(), ch.Name)
213
214 var uch *upstreamChannel
215 if net.conn != nil {
216 uch = net.conn.channels.Value(ch.Name)
217
218 net.conn.updateChannelAutoDetach(ch.Name)
219 }
220
221 net.forEachDownstream(func(dc *downstreamConn) {
222 dc.SendMessage(&irc.Message{
223 Prefix: dc.prefix(),
224 Command: "JOIN",
225 Params: []string{dc.marshalEntity(net, ch.Name)},
226 })
227
228 if uch != nil {
229 forwardChannel(dc, uch)
230 }
231
232 dc.sendTargetBacklog(net, ch.Name)
233 })
234}
235
236func (net *network) deleteChannel(name string) error {
237 ch := net.channels.Value(name)
238 if ch == nil {
239 return fmt.Errorf("unknown channel %q", name)
240 }
241 if net.conn != nil {
242 uch := net.conn.channels.Value(ch.Name)
243 if uch != nil {
244 uch.updateAutoDetach(0)
245 }
246 }
247
248 if err := net.user.srv.db.DeleteChannel(ch.ID); err != nil {
249 return err
250 }
251 net.channels.Delete(name)
252 return nil
253}
254
255func (net *network) updateCasemapping(newCasemap casemapping) {
256 net.casemap = newCasemap
257 net.channels.SetCasemapping(newCasemap)
258 net.delivered.SetCasemapping(newCasemap)
259 if net.conn != nil {
260 net.conn.channels.SetCasemapping(newCasemap)
261 for _, entry := range net.conn.channels.innerMap {
262 uch := entry.value.(*upstreamChannel)
263 uch.Members.SetCasemapping(newCasemap)
264 }
265 }
266}
267
268type user struct {
269 User
270 srv *Server
271
272 events chan event
273 done chan struct{}
274
275 networks []*network
276 downstreamConns []*downstreamConn
277 msgStore messageStore
278
279 // LIST commands in progress
280 pendingLISTs []pendingLIST
281}
282
283type pendingLIST struct {
284 downstreamID uint64
285 // list of per-upstream LIST commands not yet sent or completed
286 pendingCommands map[int64]*irc.Message
287}
288
289func newUser(srv *Server, record *User) *user {
290 var msgStore messageStore
291 if srv.LogPath != "" {
292 msgStore = newFSMessageStore(srv.LogPath, record.Username)
293 } else {
294 msgStore = newMemoryMessageStore()
295 }
296
297 return &user{
298 User: *record,
299 srv: srv,
300 events: make(chan event, 64),
301 done: make(chan struct{}),
302 msgStore: msgStore,
303 }
304}
305
306func (u *user) forEachNetwork(f func(*network)) {
307 for _, network := range u.networks {
308 f(network)
309 }
310}
311
312func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
313 for _, network := range u.networks {
314 if network.conn == nil {
315 continue
316 }
317 f(network.conn)
318 }
319}
320
321func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
322 for _, dc := range u.downstreamConns {
323 f(dc)
324 }
325}
326
327func (u *user) getNetwork(name string) *network {
328 for _, network := range u.networks {
329 if network.Addr == name {
330 return network
331 }
332 if network.Name != "" && network.Name == name {
333 return network
334 }
335 }
336 return nil
337}
338
339func (u *user) getNetworkByID(id int64) *network {
340 for _, net := range u.networks {
341 if net.ID == id {
342 return net
343 }
344 }
345 return nil
346}
347
348func (u *user) run() {
349 defer func() {
350 if u.msgStore != nil {
351 if err := u.msgStore.Close(); err != nil {
352 u.srv.Logger.Printf("failed to close message store for user %q: %v", u.Username, err)
353 }
354 }
355 close(u.done)
356 }()
357
358 networks, err := u.srv.db.ListNetworks(u.ID)
359 if err != nil {
360 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
361 return
362 }
363
364 for _, record := range networks {
365 record := record
366 channels, err := u.srv.db.ListChannels(record.ID)
367 if err != nil {
368 u.srv.Logger.Printf("failed to list channels for user %q, network %q: %v", u.Username, record.GetName(), err)
369 continue
370 }
371
372 network := newNetwork(u, &record, channels)
373 u.networks = append(u.networks, network)
374
375 go network.run()
376 }
377
378 for e := range u.events {
379 switch e := e.(type) {
380 case eventUpstreamConnected:
381 uc := e.uc
382
383 uc.network.conn = uc
384
385 uc.updateAway()
386
387 uc.forEachDownstream(func(dc *downstreamConn) {
388 dc.updateSupportedCaps()
389 sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.GetName()))
390
391 dc.updateNick()
392 })
393 uc.network.lastError = nil
394 case eventUpstreamDisconnected:
395 u.handleUpstreamDisconnected(e.uc)
396 case eventUpstreamConnectionError:
397 net := e.net
398
399 stopped := false
400 select {
401 case <-net.stopped:
402 stopped = true
403 default:
404 }
405
406 if !stopped && (net.lastError == nil || net.lastError.Error() != e.err.Error()) {
407 net.forEachDownstream(func(dc *downstreamConn) {
408 sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.GetName(), e.err))
409 })
410 }
411 net.lastError = e.err
412 case eventUpstreamError:
413 uc := e.uc
414
415 uc.forEachDownstream(func(dc *downstreamConn) {
416 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.GetName(), e.err))
417 })
418 uc.network.lastError = e.err
419 case eventUpstreamMessage:
420 msg, uc := e.msg, e.uc
421 if uc.isClosed() {
422 uc.logger.Printf("ignoring message on closed connection: %v", msg)
423 break
424 }
425 if err := uc.handleMessage(msg); err != nil {
426 uc.logger.Printf("failed to handle message %q: %v", msg, err)
427 }
428 case eventChannelDetach:
429 uc, name := e.uc, e.name
430 c := uc.network.channels.Value(name)
431 if c == nil || c.Detached {
432 continue
433 }
434 uc.network.detach(c)
435 if err := uc.srv.db.StoreChannel(uc.network.ID, c); err != nil {
436 u.srv.Logger.Printf("failed to store updated detached channel %q: %v", c.Name, err)
437 }
438 case eventDownstreamConnected:
439 dc := e.dc
440
441 if err := dc.welcome(); err != nil {
442 dc.logger.Printf("failed to handle new registered connection: %v", err)
443 break
444 }
445
446 u.downstreamConns = append(u.downstreamConns, dc)
447
448 dc.forEachNetwork(func(network *network) {
449 if network.lastError != nil {
450 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", network.GetName(), network.lastError))
451 }
452 })
453
454 u.forEachUpstream(func(uc *upstreamConn) {
455 uc.updateAway()
456 })
457 case eventDownstreamDisconnected:
458 dc := e.dc
459
460 for i := range u.downstreamConns {
461 if u.downstreamConns[i] == dc {
462 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
463 break
464 }
465 }
466
467 // Save history if we're the last client with this name
468 skipHistory := make(map[*network]bool)
469 u.forEachDownstream(func(conn *downstreamConn) {
470 if dc.clientName == conn.clientName {
471 skipHistory[conn.network] = true
472 }
473 })
474
475 dc.forEachNetwork(func(net *network) {
476 if skipHistory[net] || skipHistory[nil] {
477 return
478 }
479
480 net.offlineClients[dc.clientName] = struct{}{}
481 })
482
483 u.forEachUpstream(func(uc *upstreamConn) {
484 uc.updateAway()
485 })
486 case eventDownstreamMessage:
487 msg, dc := e.msg, e.dc
488 if dc.isClosed() {
489 dc.logger.Printf("ignoring message on closed connection: %v", msg)
490 break
491 }
492 err := dc.handleMessage(msg)
493 if ircErr, ok := err.(ircError); ok {
494 ircErr.Message.Prefix = dc.srv.prefix()
495 dc.SendMessage(ircErr.Message)
496 } else if err != nil {
497 dc.logger.Printf("failed to handle message %q: %v", msg, err)
498 dc.Close()
499 }
500 case eventStop:
501 u.forEachDownstream(func(dc *downstreamConn) {
502 dc.Close()
503 })
504 for _, n := range u.networks {
505 n.stop()
506 }
507 return
508 default:
509 u.srv.Logger.Printf("received unknown event type: %T", e)
510 }
511 }
512}
513
514func (u *user) handleUpstreamDisconnected(uc *upstreamConn) {
515 uc.network.conn = nil
516
517 uc.endPendingLISTs(true)
518
519 for _, entry := range uc.channels.innerMap {
520 uch := entry.value.(*upstreamChannel)
521 uch.updateAutoDetach(0)
522 }
523
524 uc.forEachDownstream(func(dc *downstreamConn) {
525 dc.updateSupportedCaps()
526 })
527
528 if uc.network.lastError == nil {
529 uc.forEachDownstream(func(dc *downstreamConn) {
530 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.GetName()))
531 })
532 }
533}
534
535func (u *user) addNetwork(network *network) {
536 u.networks = append(u.networks, network)
537 go network.run()
538}
539
540func (u *user) removeNetwork(network *network) {
541 network.stop()
542
543 u.forEachDownstream(func(dc *downstreamConn) {
544 if dc.network != nil && dc.network == network {
545 dc.Close()
546 }
547 })
548
549 for i, net := range u.networks {
550 if net == network {
551 u.networks = append(u.networks[:i], u.networks[i+1:]...)
552 return
553 }
554 }
555
556 panic("tried to remove a non-existing network")
557}
558
559func (u *user) createNetwork(record *Network) (*network, error) {
560 if record.ID != 0 {
561 panic("tried creating an already-existing network")
562 }
563
564 network := newNetwork(u, record, nil)
565 err := u.srv.db.StoreNetwork(u.ID, &network.Network)
566 if err != nil {
567 return nil, err
568 }
569
570 u.addNetwork(network)
571
572 return network, nil
573}
574
575func (u *user) updateNetwork(record *Network) (*network, error) {
576 if record.ID == 0 {
577 panic("tried updating a new network")
578 }
579
580 network := u.getNetworkByID(record.ID)
581 if network == nil {
582 panic("tried updating a non-existing network")
583 }
584
585 if err := u.srv.db.StoreNetwork(u.ID, record); err != nil {
586 return nil, err
587 }
588
589 // Most network changes require us to re-connect to the upstream server
590
591 channels := make([]Channel, 0, network.channels.Len())
592 for _, entry := range network.channels.innerMap {
593 ch := entry.value.(*Channel)
594 channels = append(channels, *ch)
595 }
596
597 updatedNetwork := newNetwork(u, record, channels)
598
599 // If we're currently connected, disconnect and perform the necessary
600 // bookkeeping
601 if network.conn != nil {
602 network.stop()
603 // Note: this will set network.conn to nil
604 u.handleUpstreamDisconnected(network.conn)
605 }
606
607 // Patch downstream connections to use our fresh updated network
608 u.forEachDownstream(func(dc *downstreamConn) {
609 if dc.network != nil && dc.network == network {
610 dc.network = updatedNetwork
611 }
612 })
613
614 // We need to remove the network after patching downstream connections,
615 // otherwise they'll get closed
616 u.removeNetwork(network)
617
618 // This will re-connect to the upstream server
619 u.addNetwork(updatedNetwork)
620
621 return updatedNetwork, nil
622}
623
624func (u *user) deleteNetwork(id int64) error {
625 network := u.getNetworkByID(id)
626 if network == nil {
627 panic("tried deleting a non-existing network")
628 }
629
630 if err := u.srv.db.DeleteNetwork(network.ID); err != nil {
631 return err
632 }
633
634 u.removeNetwork(network)
635 return nil
636}
637
638func (u *user) updatePassword(hashed string) error {
639 u.User.Password = hashed
640 return u.srv.db.StoreUser(&u.User)
641}
642
643func (u *user) stop() {
644 u.events <- eventStop{}
645 <-u.done
646}
Note: See TracBrowser for help on using the repository browser.