source: code/trunk/downstream.go@ 79

Last change on this file since 79 was 77, checked in by contact, 5 years ago

Add SQLite database

Closes: https://todo.sr.ht/~emersion/jounce/9

File size: 11.4 KB
Line 
1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
7 "strings"
8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type consumption struct {
43 consumer *RingConsumer
44 upstreamConn *upstreamConn
45}
46
47type downstreamConn struct {
48 net net.Conn
49 irc *irc.Conn
50 srv *Server
51 logger Logger
52 messages chan *irc.Message
53 consumptions chan consumption
54 closed chan struct{}
55
56 registered bool
57 user *user
58 nick string
59 username string
60 realname string
61 network *network // can be nil
62}
63
64func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
65 dc := &downstreamConn{
66 net: netConn,
67 irc: irc.NewConn(netConn),
68 srv: srv,
69 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
70 messages: make(chan *irc.Message, 64),
71 consumptions: make(chan consumption),
72 closed: make(chan struct{}),
73 }
74
75 go func() {
76 if err := dc.writeMessages(); err != nil {
77 dc.logger.Printf("failed to write message: %v", err)
78 }
79 if err := dc.net.Close(); err != nil {
80 dc.logger.Printf("failed to close connection: %v", err)
81 } else {
82 dc.logger.Printf("connection closed")
83 }
84 }()
85
86 return dc
87}
88
89func (dc *downstreamConn) prefix() *irc.Prefix {
90 return &irc.Prefix{
91 Name: dc.nick,
92 User: dc.username,
93 // TODO: fill the host?
94 }
95}
96
97func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
98 return name
99}
100
101func (dc *downstreamConn) forEachUpstream(f func(*upstreamConn)) {
102 dc.user.forEachUpstream(func(uc *upstreamConn) {
103 if dc.network != nil && uc.network != dc.network {
104 return
105 }
106 f(uc)
107 })
108}
109
110func (dc *downstreamConn) unmarshalChannel(name string) (*upstreamConn, string, error) {
111 // TODO: extract network name from channel name if dc.upstream == nil
112 var channel *upstreamChannel
113 var err error
114 dc.forEachUpstream(func(uc *upstreamConn) {
115 if err != nil {
116 return
117 }
118 if ch, ok := uc.channels[name]; ok {
119 if channel != nil {
120 err = fmt.Errorf("ambiguous channel name %q", name)
121 } else {
122 channel = ch
123 }
124 }
125 })
126 if channel == nil {
127 return nil, "", ircError{&irc.Message{
128 Command: irc.ERR_NOSUCHCHANNEL,
129 Params: []string{name, "No such channel"},
130 }}
131 }
132 return channel.conn, channel.Name, nil
133}
134
135func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
136 if nick == uc.nick {
137 return dc.nick
138 }
139 return nick
140}
141
142func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
143 if prefix.Name == uc.nick {
144 return dc.prefix()
145 }
146 return prefix
147}
148
149func (dc *downstreamConn) isClosed() bool {
150 select {
151 case <-dc.closed:
152 return true
153 default:
154 return false
155 }
156}
157
158func (dc *downstreamConn) readMessages() error {
159 dc.logger.Printf("new connection")
160
161 for {
162 msg, err := dc.irc.ReadMessage()
163 if err == io.EOF {
164 break
165 } else if err != nil {
166 return fmt.Errorf("failed to read IRC command: %v", err)
167 }
168
169 if dc.srv.Debug {
170 dc.logger.Printf("received: %v", msg)
171 }
172
173 err = dc.handleMessage(msg)
174 if ircErr, ok := err.(ircError); ok {
175 ircErr.Message.Prefix = dc.srv.prefix()
176 dc.SendMessage(ircErr.Message)
177 } else if err != nil {
178 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
179 }
180
181 if dc.isClosed() {
182 return nil
183 }
184 }
185
186 return nil
187}
188
189func (dc *downstreamConn) writeMessages() error {
190 for {
191 var err error
192 var closed bool
193 select {
194 case msg := <-dc.messages:
195 if dc.srv.Debug {
196 dc.logger.Printf("sent: %v", msg)
197 }
198 err = dc.irc.WriteMessage(msg)
199 case consumption := <-dc.consumptions:
200 consumer, uc := consumption.consumer, consumption.upstreamConn
201 for {
202 msg := consumer.Peek()
203 if msg == nil {
204 break
205 }
206 msg = msg.Copy()
207 switch msg.Command {
208 case "PRIVMSG":
209 // TODO: detect whether it's a user or a channel
210 msg.Params[0] = dc.marshalChannel(uc, msg.Params[0])
211 default:
212 panic("expected to consume a PRIVMSG message")
213 }
214 if dc.srv.Debug {
215 dc.logger.Printf("sent: %v", msg)
216 }
217 err = dc.irc.WriteMessage(msg)
218 if err != nil {
219 break
220 }
221 consumer.Consume()
222 }
223 case <-dc.closed:
224 closed = true
225 }
226 if err != nil {
227 return err
228 }
229 if closed {
230 break
231 }
232 }
233 return nil
234}
235
236func (dc *downstreamConn) Close() error {
237 if dc.isClosed() {
238 return fmt.Errorf("downstream connection already closed")
239 }
240
241 if u := dc.user; u != nil {
242 u.lock.Lock()
243 for i := range u.downstreamConns {
244 if u.downstreamConns[i] == dc {
245 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
246 break
247 }
248 }
249 u.lock.Unlock()
250 }
251
252 close(dc.closed)
253 return nil
254}
255
256func (dc *downstreamConn) SendMessage(msg *irc.Message) {
257 dc.messages <- msg
258}
259
260func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
261 switch msg.Command {
262 case "QUIT":
263 return dc.Close()
264 case "PING":
265 dc.SendMessage(&irc.Message{
266 Prefix: dc.srv.prefix(),
267 Command: "PONG",
268 Params: msg.Params,
269 })
270 return nil
271 default:
272 if dc.registered {
273 return dc.handleMessageRegistered(msg)
274 } else {
275 return dc.handleMessageUnregistered(msg)
276 }
277 }
278}
279
280func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
281 switch msg.Command {
282 case "NICK":
283 if err := parseMessageParams(msg, &dc.nick); err != nil {
284 return err
285 }
286 case "USER":
287 var username string
288 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
289 return err
290 }
291 dc.username = "~" + username
292 default:
293 dc.logger.Printf("unhandled message: %v", msg)
294 return newUnknownCommandError(msg.Command)
295 }
296 if dc.username != "" && dc.nick != "" {
297 return dc.register()
298 }
299 return nil
300}
301
302func (dc *downstreamConn) register() error {
303 username := strings.TrimPrefix(dc.username, "~")
304 var networkName string
305 if i := strings.LastIndexAny(username, "/@"); i >= 0 {
306 networkName = username[i+1:]
307 }
308 if i := strings.IndexAny(username, "/@"); i >= 0 {
309 username = username[:i]
310 }
311
312 u := dc.srv.getUser(username)
313 if u == nil {
314 dc.logger.Printf("failed authentication: unknown username %q", username)
315 dc.SendMessage(&irc.Message{
316 Prefix: dc.srv.prefix(),
317 Command: irc.ERR_PASSWDMISMATCH,
318 Params: []string{"*", "Invalid username or password"},
319 })
320 return nil
321 }
322
323 if networkName != "" {
324 dc.network = dc.user.getNetwork(networkName)
325 if dc.network == nil {
326 dc.logger.Printf("failed registration: unknown network %q", networkName)
327 dc.SendMessage(&irc.Message{
328 Prefix: dc.srv.prefix(),
329 Command: irc.ERR_PASSWDMISMATCH,
330 Params: []string{"*", fmt.Sprintf("Unknown network %q", networkName)},
331 })
332 return nil
333 }
334 }
335
336 dc.registered = true
337 dc.user = u
338
339 u.lock.Lock()
340 firstDownstream := len(u.downstreamConns) == 0
341 u.downstreamConns = append(u.downstreamConns, dc)
342 u.lock.Unlock()
343
344 dc.SendMessage(&irc.Message{
345 Prefix: dc.srv.prefix(),
346 Command: irc.RPL_WELCOME,
347 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
348 })
349 dc.SendMessage(&irc.Message{
350 Prefix: dc.srv.prefix(),
351 Command: irc.RPL_YOURHOST,
352 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
353 })
354 dc.SendMessage(&irc.Message{
355 Prefix: dc.srv.prefix(),
356 Command: irc.RPL_CREATED,
357 Params: []string{dc.nick, "Who cares when the server was created?"},
358 })
359 dc.SendMessage(&irc.Message{
360 Prefix: dc.srv.prefix(),
361 Command: irc.RPL_MYINFO,
362 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
363 })
364 dc.SendMessage(&irc.Message{
365 Prefix: dc.srv.prefix(),
366 Command: irc.ERR_NOMOTD,
367 Params: []string{dc.nick, "No MOTD"},
368 })
369
370 dc.forEachUpstream(func(uc *upstreamConn) {
371 // TODO: fix races accessing upstream connection data
372 for _, ch := range uc.channels {
373 if ch.complete {
374 forwardChannel(dc, ch)
375 }
376 }
377
378 historyName := dc.username
379
380 var seqPtr *uint64
381 if firstDownstream {
382 seq, ok := uc.history[historyName]
383 if ok {
384 seqPtr = &seq
385 }
386 }
387
388 consumer, ch := uc.ring.NewConsumer(seqPtr)
389 go func() {
390 for {
391 var closed bool
392 select {
393 case <-ch:
394 dc.consumptions <- consumption{consumer, uc}
395 case <-dc.closed:
396 closed = true
397 }
398 if closed {
399 break
400 }
401 }
402
403 seq := consumer.Close()
404
405 dc.user.lock.Lock()
406 lastDownstream := len(dc.user.downstreamConns) == 0
407 dc.user.lock.Unlock()
408
409 if lastDownstream {
410 uc.history[historyName] = seq
411 }
412 }()
413 })
414
415 return nil
416}
417
418func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
419 switch msg.Command {
420 case "USER":
421 return ircError{&irc.Message{
422 Command: irc.ERR_ALREADYREGISTERED,
423 Params: []string{dc.nick, "You may not reregister"},
424 }}
425 case "NICK":
426 dc.forEachUpstream(func(uc *upstreamConn) {
427 uc.SendMessage(msg)
428 })
429 case "JOIN", "PART":
430 var name string
431 if err := parseMessageParams(msg, &name); err != nil {
432 return err
433 }
434
435 uc, upstreamName, err := dc.unmarshalChannel(name)
436 if err != nil {
437 return ircError{&irc.Message{
438 Command: irc.ERR_NOSUCHCHANNEL,
439 Params: []string{name, err.Error()},
440 }}
441 }
442
443 uc.SendMessage(&irc.Message{
444 Command: msg.Command,
445 Params: []string{upstreamName},
446 })
447 // TODO: add/remove channel from upstream config
448 case "MODE":
449 if msg.Prefix == nil {
450 return fmt.Errorf("missing prefix")
451 }
452
453 var name string
454 if err := parseMessageParams(msg, &name); err != nil {
455 return err
456 }
457
458 var modeStr string
459 if len(msg.Params) > 1 {
460 modeStr = msg.Params[1]
461 }
462
463 if msg.Prefix.Name != name {
464 uc, upstreamName, err := dc.unmarshalChannel(name)
465 if err != nil {
466 return err
467 }
468
469 if modeStr != "" {
470 uc.SendMessage(&irc.Message{
471 Command: "MODE",
472 Params: []string{upstreamName, modeStr},
473 })
474 } else {
475 ch, ok := uc.channels[upstreamName]
476 if !ok {
477 return ircError{&irc.Message{
478 Command: irc.ERR_NOSUCHCHANNEL,
479 Params: []string{name, "No such channel"},
480 }}
481 }
482
483 dc.SendMessage(&irc.Message{
484 Prefix: dc.srv.prefix(),
485 Command: irc.RPL_CHANNELMODEIS,
486 Params: []string{name, string(ch.modes)},
487 })
488 }
489 } else {
490 if name != dc.nick {
491 return ircError{&irc.Message{
492 Command: irc.ERR_USERSDONTMATCH,
493 Params: []string{dc.nick, "Cannot change mode for other users"},
494 }}
495 }
496
497 if modeStr != "" {
498 dc.forEachUpstream(func(uc *upstreamConn) {
499 uc.SendMessage(&irc.Message{
500 Command: "MODE",
501 Params: []string{uc.nick, modeStr},
502 })
503 })
504 } else {
505 dc.SendMessage(&irc.Message{
506 Prefix: dc.srv.prefix(),
507 Command: irc.RPL_UMODEIS,
508 Params: []string{""}, // TODO
509 })
510 }
511 }
512 case "PRIVMSG":
513 var targetsStr, text string
514 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
515 return err
516 }
517
518 for _, name := range strings.Split(targetsStr, ",") {
519 uc, upstreamName, err := dc.unmarshalChannel(name)
520 if err != nil {
521 return err
522 }
523
524 uc.SendMessage(&irc.Message{
525 Command: "PRIVMSG",
526 Params: []string{upstreamName, text},
527 })
528 }
529 default:
530 dc.logger.Printf("unhandled message: %v", msg)
531 return newUnknownCommandError(msg.Command)
532 }
533 return nil
534}
Note: See TracBrowser for help on using the repository browser.