source: code/trunk/downstream.go@ 91

Last change on this file since 91 was 91, checked in by contact, 5 years ago

Auto-save IRC networks

File size: 13.5 KB
RevLine 
[13]1package jounce
2
3import (
[91]4 "crypto/tls"
[13]5 "fmt"
6 "io"
7 "net"
[39]8 "strings"
[91]9 "time"
[13]10
[85]11 "golang.org/x/crypto/bcrypt"
[13]12 "gopkg.in/irc.v3"
13)
14
15type ircError struct {
16 Message *irc.Message
17}
18
[85]19func (err ircError) Error() string {
20 return err.Message.String()
21}
22
[13]23func newUnknownCommandError(cmd string) ircError {
24 return ircError{&irc.Message{
25 Command: irc.ERR_UNKNOWNCOMMAND,
26 Params: []string{
27 "*",
28 cmd,
29 "Unknown command",
30 },
31 }}
32}
33
34func newNeedMoreParamsError(cmd string) ircError {
35 return ircError{&irc.Message{
36 Command: irc.ERR_NEEDMOREPARAMS,
37 Params: []string{
38 "*",
39 cmd,
40 "Not enough parameters",
41 },
42 }}
43}
44
[85]45var errAuthFailed = ircError{&irc.Message{
46 Command: irc.ERR_PASSWDMISMATCH,
47 Params: []string{"*", "Invalid username or password"},
48}}
[13]49
[69]50type consumption struct {
51 consumer *RingConsumer
52 upstreamConn *upstreamConn
53}
54
[13]55type downstreamConn struct {
[69]56 net net.Conn
57 irc *irc.Conn
58 srv *Server
59 logger Logger
60 messages chan *irc.Message
61 consumptions chan consumption
62 closed chan struct{}
[22]63
[13]64 registered bool
[37]65 user *user
[13]66 nick string
67 username string
68 realname string
[85]69 password string // empty after authentication
[77]70 network *network // can be nil
[13]71}
72
[22]73func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]74 dc := &downstreamConn{
[69]75 net: netConn,
76 irc: irc.NewConn(netConn),
77 srv: srv,
78 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
79 messages: make(chan *irc.Message, 64),
80 consumptions: make(chan consumption),
81 closed: make(chan struct{}),
[22]82 }
[26]83
84 go func() {
[56]85 if err := dc.writeMessages(); err != nil {
86 dc.logger.Printf("failed to write message: %v", err)
[26]87 }
[55]88 if err := dc.net.Close(); err != nil {
89 dc.logger.Printf("failed to close connection: %v", err)
[45]90 } else {
[55]91 dc.logger.Printf("connection closed")
[45]92 }
[26]93 }()
94
[55]95 return dc
[22]96}
97
[55]98func (dc *downstreamConn) prefix() *irc.Prefix {
[27]99 return &irc.Prefix{
[55]100 Name: dc.nick,
101 User: dc.username,
[27]102 // TODO: fill the host?
103 }
104}
105
[69]106func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
107 return name
108}
109
[90]110func (dc *downstreamConn) forEachNetwork(f func(*network)) {
111 if dc.network != nil {
112 f(dc.network)
113 } else {
114 dc.user.forEachNetwork(f)
115 }
116}
117
[73]118func (dc *downstreamConn) forEachUpstream(f func(*upstreamConn)) {
119 dc.user.forEachUpstream(func(uc *upstreamConn) {
[77]120 if dc.network != nil && uc.network != dc.network {
[73]121 return
122 }
123 f(uc)
124 })
125}
126
[89]127// upstream returns the upstream connection, if any. If there are zero or if
128// there are multiple upstream connections, it returns nil.
129func (dc *downstreamConn) upstream() *upstreamConn {
130 if dc.network == nil {
131 return nil
132 }
133
134 var upstream *upstreamConn
135 dc.forEachUpstream(func(uc *upstreamConn) {
136 upstream = uc
137 })
138 return upstream
139}
140
[69]141func (dc *downstreamConn) unmarshalChannel(name string) (*upstreamConn, string, error) {
[89]142 if uc := dc.upstream(); uc != nil {
143 return uc, name, nil
144 }
145
[73]146 // TODO: extract network name from channel name if dc.upstream == nil
147 var channel *upstreamChannel
148 var err error
149 dc.forEachUpstream(func(uc *upstreamConn) {
150 if err != nil {
151 return
152 }
153 if ch, ok := uc.channels[name]; ok {
154 if channel != nil {
155 err = fmt.Errorf("ambiguous channel name %q", name)
156 } else {
157 channel = ch
158 }
159 }
160 })
161 if channel == nil {
162 return nil, "", ircError{&irc.Message{
163 Command: irc.ERR_NOSUCHCHANNEL,
164 Params: []string{name, "No such channel"},
165 }}
[69]166 }
[73]167 return channel.conn, channel.Name, nil
[69]168}
169
170func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
171 if nick == uc.nick {
172 return dc.nick
173 }
174 return nick
175}
176
177func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
178 if prefix.Name == uc.nick {
179 return dc.prefix()
180 }
181 return prefix
182}
183
[57]184func (dc *downstreamConn) isClosed() bool {
185 select {
186 case <-dc.closed:
187 return true
188 default:
189 return false
190 }
191}
192
[55]193func (dc *downstreamConn) readMessages() error {
194 dc.logger.Printf("new connection")
[22]195
196 for {
[55]197 msg, err := dc.irc.ReadMessage()
[22]198 if err == io.EOF {
199 break
200 } else if err != nil {
201 return fmt.Errorf("failed to read IRC command: %v", err)
202 }
203
[64]204 if dc.srv.Debug {
205 dc.logger.Printf("received: %v", msg)
206 }
207
[55]208 err = dc.handleMessage(msg)
[22]209 if ircErr, ok := err.(ircError); ok {
[55]210 ircErr.Message.Prefix = dc.srv.prefix()
211 dc.SendMessage(ircErr.Message)
[22]212 } else if err != nil {
213 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
214 }
215
[57]216 if dc.isClosed() {
[22]217 return nil
218 }
219 }
220
[45]221 return nil
[22]222}
223
[56]224func (dc *downstreamConn) writeMessages() error {
[57]225 for {
226 var err error
227 var closed bool
228 select {
229 case msg := <-dc.messages:
[64]230 if dc.srv.Debug {
231 dc.logger.Printf("sent: %v", msg)
232 }
[57]233 err = dc.irc.WriteMessage(msg)
[69]234 case consumption := <-dc.consumptions:
235 consumer, uc := consumption.consumer, consumption.upstreamConn
[57]236 for {
237 msg := consumer.Peek()
238 if msg == nil {
239 break
240 }
[69]241 msg = msg.Copy()
242 switch msg.Command {
243 case "PRIVMSG":
244 // TODO: detect whether it's a user or a channel
245 msg.Params[0] = dc.marshalChannel(uc, msg.Params[0])
246 default:
247 panic("expected to consume a PRIVMSG message")
248 }
[64]249 if dc.srv.Debug {
250 dc.logger.Printf("sent: %v", msg)
251 }
[57]252 err = dc.irc.WriteMessage(msg)
253 if err != nil {
254 break
255 }
256 consumer.Consume()
257 }
258 case <-dc.closed:
259 closed = true
260 }
261 if err != nil {
[56]262 return err
263 }
[57]264 if closed {
265 break
266 }
[56]267 }
268 return nil
269}
270
[55]271func (dc *downstreamConn) Close() error {
[57]272 if dc.isClosed() {
[26]273 return fmt.Errorf("downstream connection already closed")
274 }
[40]275
[55]276 if u := dc.user; u != nil {
[40]277 u.lock.Lock()
278 for i := range u.downstreamConns {
[55]279 if u.downstreamConns[i] == dc {
[40]280 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]281 break
[40]282 }
283 }
284 u.lock.Unlock()
[13]285 }
[40]286
[57]287 close(dc.closed)
[45]288 return nil
[13]289}
290
[55]291func (dc *downstreamConn) SendMessage(msg *irc.Message) {
292 dc.messages <- msg
[54]293}
294
[55]295func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]296 switch msg.Command {
[28]297 case "QUIT":
[55]298 return dc.Close()
[13]299 case "PING":
[55]300 dc.SendMessage(&irc.Message{
301 Prefix: dc.srv.prefix(),
[13]302 Command: "PONG",
[68]303 Params: msg.Params,
[54]304 })
[26]305 return nil
[13]306 default:
[55]307 if dc.registered {
308 return dc.handleMessageRegistered(msg)
[13]309 } else {
[55]310 return dc.handleMessageUnregistered(msg)
[13]311 }
312 }
313}
314
[55]315func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]316 switch msg.Command {
317 case "NICK":
[55]318 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]319 return err
[13]320 }
321 case "USER":
[43]322 var username string
[55]323 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]324 return err
[13]325 }
[55]326 dc.username = "~" + username
[85]327 case "PASS":
328 if err := parseMessageParams(msg, &dc.password); err != nil {
329 return err
330 }
[13]331 default:
[55]332 dc.logger.Printf("unhandled message: %v", msg)
[13]333 return newUnknownCommandError(msg.Command)
334 }
[55]335 if dc.username != "" && dc.nick != "" {
336 return dc.register()
[13]337 }
338 return nil
339}
340
[91]341func sanityCheckServer(addr string) error {
342 dialer := net.Dialer{Timeout: 30 * time.Second}
343 conn, err := tls.DialWithDialer(&dialer, "tcp", addr, nil)
344 if err != nil {
345 return err
346 }
347 return conn.Close()
348}
349
[55]350func (dc *downstreamConn) register() error {
[73]351 username := strings.TrimPrefix(dc.username, "~")
[77]352 var networkName string
[73]353 if i := strings.LastIndexAny(username, "/@"); i >= 0 {
[77]354 networkName = username[i+1:]
[73]355 }
356 if i := strings.IndexAny(username, "/@"); i >= 0 {
357 username = username[:i]
358 }
359
[85]360 password := dc.password
361 dc.password = ""
362
[73]363 u := dc.srv.getUser(username)
[38]364 if u == nil {
[85]365 dc.logger.Printf("failed authentication for %q: unknown username", username)
366 return errAuthFailed
[37]367 }
368
[85]369 err := bcrypt.CompareHashAndPassword([]byte(u.Password), []byte(password))
370 if err != nil {
371 dc.logger.Printf("failed authentication for %q: %v", username, err)
372 return errAuthFailed
373 }
374
[88]375 var network *network
[77]376 if networkName != "" {
[88]377 network = u.getNetwork(networkName)
378 if network == nil {
[91]379 addr := networkName
380 if !strings.ContainsRune(addr, ':') {
381 addr = addr + ":6697"
382 }
383
384 dc.logger.Printf("trying to connect to new upstream server %q", addr)
385 if err := sanityCheckServer(addr); err != nil {
386 dc.logger.Printf("failed to connect to %q: %v", addr, err)
387 return ircError{&irc.Message{
388 Command: irc.ERR_PASSWDMISMATCH,
389 Params: []string{"*", fmt.Sprintf("Failed to connect to %q", networkName)},
390 }}
391 }
392
393 dc.logger.Printf("auto-adding network %q", networkName)
394 network, err = u.createNetwork(networkName, dc.nick)
395 if err != nil {
396 return err
397 }
[73]398 }
399 }
400
[55]401 dc.registered = true
402 dc.user = u
[88]403 dc.network = network
[13]404
[40]405 u.lock.Lock()
[57]406 firstDownstream := len(u.downstreamConns) == 0
[55]407 u.downstreamConns = append(u.downstreamConns, dc)
[40]408 u.lock.Unlock()
409
[55]410 dc.SendMessage(&irc.Message{
411 Prefix: dc.srv.prefix(),
[13]412 Command: irc.RPL_WELCOME,
[55]413 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]414 })
[55]415 dc.SendMessage(&irc.Message{
416 Prefix: dc.srv.prefix(),
[13]417 Command: irc.RPL_YOURHOST,
[55]418 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]419 })
[55]420 dc.SendMessage(&irc.Message{
421 Prefix: dc.srv.prefix(),
[13]422 Command: irc.RPL_CREATED,
[55]423 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]424 })
[55]425 dc.SendMessage(&irc.Message{
426 Prefix: dc.srv.prefix(),
[13]427 Command: irc.RPL_MYINFO,
[55]428 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]429 })
[55]430 dc.SendMessage(&irc.Message{
431 Prefix: dc.srv.prefix(),
[13]432 Command: irc.ERR_NOMOTD,
[55]433 Params: []string{dc.nick, "No MOTD"},
[54]434 })
[13]435
[73]436 dc.forEachUpstream(func(uc *upstreamConn) {
[30]437 // TODO: fix races accessing upstream connection data
438 for _, ch := range uc.channels {
439 if ch.complete {
[55]440 forwardChannel(dc, ch)
[30]441 }
442 }
[50]443
[73]444 historyName := dc.username
[57]445
446 var seqPtr *uint64
447 if firstDownstream {
448 seq, ok := uc.history[historyName]
449 if ok {
450 seqPtr = &seq
[50]451 }
452 }
[57]453
[59]454 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]455 go func() {
456 for {
457 var closed bool
458 select {
459 case <-ch:
[69]460 dc.consumptions <- consumption{consumer, uc}
[57]461 case <-dc.closed:
462 closed = true
463 }
464 if closed {
465 break
466 }
467 }
468
469 seq := consumer.Close()
470
471 dc.user.lock.Lock()
472 lastDownstream := len(dc.user.downstreamConns) == 0
473 dc.user.lock.Unlock()
474
475 if lastDownstream {
476 uc.history[historyName] = seq
477 }
478 }()
[39]479 })
[50]480
[13]481 return nil
482}
483
[55]484func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]485 switch msg.Command {
[42]486 case "USER":
[13]487 return ircError{&irc.Message{
488 Command: irc.ERR_ALREADYREGISTERED,
[55]489 Params: []string{dc.nick, "You may not reregister"},
[13]490 }}
[42]491 case "NICK":
[90]492 var nick string
493 if err := parseMessageParams(msg, &nick); err != nil {
494 return err
495 }
496
497 var err error
498 dc.forEachNetwork(func(n *network) {
499 if err != nil {
500 return
501 }
502 n.Nick = nick
503 err = dc.srv.db.StoreNetwork(dc.user.Username, &n.Network)
504 })
505 if err != nil {
506 return err
507 }
508
[73]509 dc.forEachUpstream(func(uc *upstreamConn) {
[60]510 uc.SendMessage(msg)
[42]511 })
[69]512 case "JOIN", "PART":
[48]513 var name string
514 if err := parseMessageParams(msg, &name); err != nil {
515 return err
516 }
517
[69]518 uc, upstreamName, err := dc.unmarshalChannel(name)
519 if err != nil {
520 return ircError{&irc.Message{
521 Command: irc.ERR_NOSUCHCHANNEL,
522 Params: []string{name, err.Error()},
523 }}
[48]524 }
525
[69]526 uc.SendMessage(&irc.Message{
527 Command: msg.Command,
528 Params: []string{upstreamName},
529 })
[89]530
531 switch msg.Command {
532 case "JOIN":
533 err := dc.srv.db.StoreChannel(uc.network.ID, &Channel{
534 Name: upstreamName,
535 })
536 if err != nil {
537 dc.logger.Printf("failed to create channel %q in DB: %v", upstreamName, err)
538 }
539 case "PART":
540 if err := dc.srv.db.DeleteChannel(uc.network.ID, upstreamName); err != nil {
541 dc.logger.Printf("failed to delete channel %q in DB: %v", upstreamName, err)
542 }
543 }
[69]544 case "MODE":
545 if msg.Prefix == nil {
546 return fmt.Errorf("missing prefix")
[49]547 }
548
[46]549 var name string
550 if err := parseMessageParams(msg, &name); err != nil {
551 return err
552 }
553
554 var modeStr string
555 if len(msg.Params) > 1 {
556 modeStr = msg.Params[1]
557 }
558
559 if msg.Prefix.Name != name {
[69]560 uc, upstreamName, err := dc.unmarshalChannel(name)
[46]561 if err != nil {
562 return err
563 }
564
565 if modeStr != "" {
[69]566 uc.SendMessage(&irc.Message{
567 Command: "MODE",
568 Params: []string{upstreamName, modeStr},
569 })
[46]570 } else {
[69]571 ch, ok := uc.channels[upstreamName]
572 if !ok {
573 return ircError{&irc.Message{
574 Command: irc.ERR_NOSUCHCHANNEL,
575 Params: []string{name, "No such channel"},
576 }}
577 }
578
[55]579 dc.SendMessage(&irc.Message{
580 Prefix: dc.srv.prefix(),
[46]581 Command: irc.RPL_CHANNELMODEIS,
[69]582 Params: []string{name, string(ch.modes)},
[54]583 })
[46]584 }
585 } else {
[55]586 if name != dc.nick {
[46]587 return ircError{&irc.Message{
588 Command: irc.ERR_USERSDONTMATCH,
[55]589 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]590 }}
591 }
592
593 if modeStr != "" {
[73]594 dc.forEachUpstream(func(uc *upstreamConn) {
[69]595 uc.SendMessage(&irc.Message{
596 Command: "MODE",
597 Params: []string{uc.nick, modeStr},
598 })
[46]599 })
600 } else {
[55]601 dc.SendMessage(&irc.Message{
602 Prefix: dc.srv.prefix(),
[46]603 Command: irc.RPL_UMODEIS,
604 Params: []string{""}, // TODO
[54]605 })
[46]606 }
607 }
[58]608 case "PRIVMSG":
609 var targetsStr, text string
610 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
611 return err
612 }
613
614 for _, name := range strings.Split(targetsStr, ",") {
[69]615 uc, upstreamName, err := dc.unmarshalChannel(name)
[58]616 if err != nil {
617 return err
618 }
619
[69]620 uc.SendMessage(&irc.Message{
[58]621 Command: "PRIVMSG",
[69]622 Params: []string{upstreamName, text},
[60]623 })
[58]624 }
[13]625 default:
[55]626 dc.logger.Printf("unhandled message: %v", msg)
[13]627 return newUnknownCommandError(msg.Command)
628 }
[42]629 return nil
[13]630}
Note: See TracBrowser for help on using the repository browser.