source: code/trunk/downstream.go@ 89

Last change on this file since 89 was 89, checked in by contact, 5 years ago

Update DB on JOIN and PART

File size: 12.5 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
[85]9 "golang.org/x/crypto/bcrypt"
[13]10 "gopkg.in/irc.v3"
11)
12
13type ircError struct {
14 Message *irc.Message
15}
16
[85]17func (err ircError) Error() string {
18 return err.Message.String()
19}
20
[13]21func newUnknownCommandError(cmd string) ircError {
22 return ircError{&irc.Message{
23 Command: irc.ERR_UNKNOWNCOMMAND,
24 Params: []string{
25 "*",
26 cmd,
27 "Unknown command",
28 },
29 }}
30}
31
32func newNeedMoreParamsError(cmd string) ircError {
33 return ircError{&irc.Message{
34 Command: irc.ERR_NEEDMOREPARAMS,
35 Params: []string{
36 "*",
37 cmd,
38 "Not enough parameters",
39 },
40 }}
41}
42
[85]43var errAuthFailed = ircError{&irc.Message{
44 Command: irc.ERR_PASSWDMISMATCH,
45 Params: []string{"*", "Invalid username or password"},
46}}
[13]47
[69]48type consumption struct {
49 consumer *RingConsumer
50 upstreamConn *upstreamConn
51}
52
[13]53type downstreamConn struct {
[69]54 net net.Conn
55 irc *irc.Conn
56 srv *Server
57 logger Logger
58 messages chan *irc.Message
59 consumptions chan consumption
60 closed chan struct{}
[22]61
[13]62 registered bool
[37]63 user *user
[13]64 nick string
65 username string
66 realname string
[85]67 password string // empty after authentication
[77]68 network *network // can be nil
[13]69}
70
[22]71func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]72 dc := &downstreamConn{
[69]73 net: netConn,
74 irc: irc.NewConn(netConn),
75 srv: srv,
76 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
77 messages: make(chan *irc.Message, 64),
78 consumptions: make(chan consumption),
79 closed: make(chan struct{}),
[22]80 }
[26]81
82 go func() {
[56]83 if err := dc.writeMessages(); err != nil {
84 dc.logger.Printf("failed to write message: %v", err)
[26]85 }
[55]86 if err := dc.net.Close(); err != nil {
87 dc.logger.Printf("failed to close connection: %v", err)
[45]88 } else {
[55]89 dc.logger.Printf("connection closed")
[45]90 }
[26]91 }()
92
[55]93 return dc
[22]94}
95
[55]96func (dc *downstreamConn) prefix() *irc.Prefix {
[27]97 return &irc.Prefix{
[55]98 Name: dc.nick,
99 User: dc.username,
[27]100 // TODO: fill the host?
101 }
102}
103
[69]104func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
105 return name
106}
107
[73]108func (dc *downstreamConn) forEachUpstream(f func(*upstreamConn)) {
109 dc.user.forEachUpstream(func(uc *upstreamConn) {
[77]110 if dc.network != nil && uc.network != dc.network {
[73]111 return
112 }
113 f(uc)
114 })
115}
116
[89]117// upstream returns the upstream connection, if any. If there are zero or if
118// there are multiple upstream connections, it returns nil.
119func (dc *downstreamConn) upstream() *upstreamConn {
120 if dc.network == nil {
121 return nil
122 }
123
124 var upstream *upstreamConn
125 dc.forEachUpstream(func(uc *upstreamConn) {
126 upstream = uc
127 })
128 return upstream
129}
130
[69]131func (dc *downstreamConn) unmarshalChannel(name string) (*upstreamConn, string, error) {
[89]132 if uc := dc.upstream(); uc != nil {
133 return uc, name, nil
134 }
135
[73]136 // TODO: extract network name from channel name if dc.upstream == nil
137 var channel *upstreamChannel
138 var err error
139 dc.forEachUpstream(func(uc *upstreamConn) {
140 if err != nil {
141 return
142 }
143 if ch, ok := uc.channels[name]; ok {
144 if channel != nil {
145 err = fmt.Errorf("ambiguous channel name %q", name)
146 } else {
147 channel = ch
148 }
149 }
150 })
151 if channel == nil {
152 return nil, "", ircError{&irc.Message{
153 Command: irc.ERR_NOSUCHCHANNEL,
154 Params: []string{name, "No such channel"},
155 }}
[69]156 }
[73]157 return channel.conn, channel.Name, nil
[69]158}
159
160func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
161 if nick == uc.nick {
162 return dc.nick
163 }
164 return nick
165}
166
167func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
168 if prefix.Name == uc.nick {
169 return dc.prefix()
170 }
171 return prefix
172}
173
[57]174func (dc *downstreamConn) isClosed() bool {
175 select {
176 case <-dc.closed:
177 return true
178 default:
179 return false
180 }
181}
182
[55]183func (dc *downstreamConn) readMessages() error {
184 dc.logger.Printf("new connection")
[22]185
186 for {
[55]187 msg, err := dc.irc.ReadMessage()
[22]188 if err == io.EOF {
189 break
190 } else if err != nil {
191 return fmt.Errorf("failed to read IRC command: %v", err)
192 }
193
[64]194 if dc.srv.Debug {
195 dc.logger.Printf("received: %v", msg)
196 }
197
[55]198 err = dc.handleMessage(msg)
[22]199 if ircErr, ok := err.(ircError); ok {
[55]200 ircErr.Message.Prefix = dc.srv.prefix()
201 dc.SendMessage(ircErr.Message)
[22]202 } else if err != nil {
203 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
204 }
205
[57]206 if dc.isClosed() {
[22]207 return nil
208 }
209 }
210
[45]211 return nil
[22]212}
213
[56]214func (dc *downstreamConn) writeMessages() error {
[57]215 for {
216 var err error
217 var closed bool
218 select {
219 case msg := <-dc.messages:
[64]220 if dc.srv.Debug {
221 dc.logger.Printf("sent: %v", msg)
222 }
[57]223 err = dc.irc.WriteMessage(msg)
[69]224 case consumption := <-dc.consumptions:
225 consumer, uc := consumption.consumer, consumption.upstreamConn
[57]226 for {
227 msg := consumer.Peek()
228 if msg == nil {
229 break
230 }
[69]231 msg = msg.Copy()
232 switch msg.Command {
233 case "PRIVMSG":
234 // TODO: detect whether it's a user or a channel
235 msg.Params[0] = dc.marshalChannel(uc, msg.Params[0])
236 default:
237 panic("expected to consume a PRIVMSG message")
238 }
[64]239 if dc.srv.Debug {
240 dc.logger.Printf("sent: %v", msg)
241 }
[57]242 err = dc.irc.WriteMessage(msg)
243 if err != nil {
244 break
245 }
246 consumer.Consume()
247 }
248 case <-dc.closed:
249 closed = true
250 }
251 if err != nil {
[56]252 return err
253 }
[57]254 if closed {
255 break
256 }
[56]257 }
258 return nil
259}
260
[55]261func (dc *downstreamConn) Close() error {
[57]262 if dc.isClosed() {
[26]263 return fmt.Errorf("downstream connection already closed")
264 }
[40]265
[55]266 if u := dc.user; u != nil {
[40]267 u.lock.Lock()
268 for i := range u.downstreamConns {
[55]269 if u.downstreamConns[i] == dc {
[40]270 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]271 break
[40]272 }
273 }
274 u.lock.Unlock()
[13]275 }
[40]276
[57]277 close(dc.closed)
[45]278 return nil
[13]279}
280
[55]281func (dc *downstreamConn) SendMessage(msg *irc.Message) {
282 dc.messages <- msg
[54]283}
284
[55]285func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]286 switch msg.Command {
[28]287 case "QUIT":
[55]288 return dc.Close()
[13]289 case "PING":
[55]290 dc.SendMessage(&irc.Message{
291 Prefix: dc.srv.prefix(),
[13]292 Command: "PONG",
[68]293 Params: msg.Params,
[54]294 })
[26]295 return nil
[13]296 default:
[55]297 if dc.registered {
298 return dc.handleMessageRegistered(msg)
[13]299 } else {
[55]300 return dc.handleMessageUnregistered(msg)
[13]301 }
302 }
303}
304
[55]305func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]306 switch msg.Command {
307 case "NICK":
[55]308 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]309 return err
[13]310 }
311 case "USER":
[43]312 var username string
[55]313 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]314 return err
[13]315 }
[55]316 dc.username = "~" + username
[85]317 case "PASS":
318 if err := parseMessageParams(msg, &dc.password); err != nil {
319 return err
320 }
[13]321 default:
[55]322 dc.logger.Printf("unhandled message: %v", msg)
[13]323 return newUnknownCommandError(msg.Command)
324 }
[55]325 if dc.username != "" && dc.nick != "" {
326 return dc.register()
[13]327 }
328 return nil
329}
330
[55]331func (dc *downstreamConn) register() error {
[73]332 username := strings.TrimPrefix(dc.username, "~")
[77]333 var networkName string
[73]334 if i := strings.LastIndexAny(username, "/@"); i >= 0 {
[77]335 networkName = username[i+1:]
[73]336 }
337 if i := strings.IndexAny(username, "/@"); i >= 0 {
338 username = username[:i]
339 }
340
[85]341 password := dc.password
342 dc.password = ""
343
[73]344 u := dc.srv.getUser(username)
[38]345 if u == nil {
[85]346 dc.logger.Printf("failed authentication for %q: unknown username", username)
347 return errAuthFailed
[37]348 }
349
[85]350 err := bcrypt.CompareHashAndPassword([]byte(u.Password), []byte(password))
351 if err != nil {
352 dc.logger.Printf("failed authentication for %q: %v", username, err)
353 return errAuthFailed
354 }
355
[88]356 var network *network
[77]357 if networkName != "" {
[88]358 network = u.getNetwork(networkName)
359 if network == nil {
[77]360 dc.logger.Printf("failed registration: unknown network %q", networkName)
[73]361 dc.SendMessage(&irc.Message{
362 Prefix: dc.srv.prefix(),
363 Command: irc.ERR_PASSWDMISMATCH,
[77]364 Params: []string{"*", fmt.Sprintf("Unknown network %q", networkName)},
[73]365 })
366 return nil
367 }
368 }
369
[55]370 dc.registered = true
371 dc.user = u
[88]372 dc.network = network
[13]373
[40]374 u.lock.Lock()
[57]375 firstDownstream := len(u.downstreamConns) == 0
[55]376 u.downstreamConns = append(u.downstreamConns, dc)
[40]377 u.lock.Unlock()
378
[55]379 dc.SendMessage(&irc.Message{
380 Prefix: dc.srv.prefix(),
[13]381 Command: irc.RPL_WELCOME,
[55]382 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]383 })
[55]384 dc.SendMessage(&irc.Message{
385 Prefix: dc.srv.prefix(),
[13]386 Command: irc.RPL_YOURHOST,
[55]387 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]388 })
[55]389 dc.SendMessage(&irc.Message{
390 Prefix: dc.srv.prefix(),
[13]391 Command: irc.RPL_CREATED,
[55]392 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]393 })
[55]394 dc.SendMessage(&irc.Message{
395 Prefix: dc.srv.prefix(),
[13]396 Command: irc.RPL_MYINFO,
[55]397 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]398 })
[55]399 dc.SendMessage(&irc.Message{
400 Prefix: dc.srv.prefix(),
[13]401 Command: irc.ERR_NOMOTD,
[55]402 Params: []string{dc.nick, "No MOTD"},
[54]403 })
[13]404
[73]405 dc.forEachUpstream(func(uc *upstreamConn) {
[30]406 // TODO: fix races accessing upstream connection data
407 for _, ch := range uc.channels {
408 if ch.complete {
[55]409 forwardChannel(dc, ch)
[30]410 }
411 }
[50]412
[73]413 historyName := dc.username
[57]414
415 var seqPtr *uint64
416 if firstDownstream {
417 seq, ok := uc.history[historyName]
418 if ok {
419 seqPtr = &seq
[50]420 }
421 }
[57]422
[59]423 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]424 go func() {
425 for {
426 var closed bool
427 select {
428 case <-ch:
[69]429 dc.consumptions <- consumption{consumer, uc}
[57]430 case <-dc.closed:
431 closed = true
432 }
433 if closed {
434 break
435 }
436 }
437
438 seq := consumer.Close()
439
440 dc.user.lock.Lock()
441 lastDownstream := len(dc.user.downstreamConns) == 0
442 dc.user.lock.Unlock()
443
444 if lastDownstream {
445 uc.history[historyName] = seq
446 }
447 }()
[39]448 })
[50]449
[13]450 return nil
451}
452
[55]453func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]454 switch msg.Command {
[42]455 case "USER":
[13]456 return ircError{&irc.Message{
457 Command: irc.ERR_ALREADYREGISTERED,
[55]458 Params: []string{dc.nick, "You may not reregister"},
[13]459 }}
[42]460 case "NICK":
[73]461 dc.forEachUpstream(func(uc *upstreamConn) {
[60]462 uc.SendMessage(msg)
[42]463 })
[69]464 case "JOIN", "PART":
[48]465 var name string
466 if err := parseMessageParams(msg, &name); err != nil {
467 return err
468 }
469
[69]470 uc, upstreamName, err := dc.unmarshalChannel(name)
471 if err != nil {
472 return ircError{&irc.Message{
473 Command: irc.ERR_NOSUCHCHANNEL,
474 Params: []string{name, err.Error()},
475 }}
[48]476 }
477
[69]478 uc.SendMessage(&irc.Message{
479 Command: msg.Command,
480 Params: []string{upstreamName},
481 })
[89]482
483 switch msg.Command {
484 case "JOIN":
485 err := dc.srv.db.StoreChannel(uc.network.ID, &Channel{
486 Name: upstreamName,
487 })
488 if err != nil {
489 dc.logger.Printf("failed to create channel %q in DB: %v", upstreamName, err)
490 }
491 case "PART":
492 if err := dc.srv.db.DeleteChannel(uc.network.ID, upstreamName); err != nil {
493 dc.logger.Printf("failed to delete channel %q in DB: %v", upstreamName, err)
494 }
495 }
[69]496 case "MODE":
497 if msg.Prefix == nil {
498 return fmt.Errorf("missing prefix")
[49]499 }
500
[46]501 var name string
502 if err := parseMessageParams(msg, &name); err != nil {
503 return err
504 }
505
506 var modeStr string
507 if len(msg.Params) > 1 {
508 modeStr = msg.Params[1]
509 }
510
511 if msg.Prefix.Name != name {
[69]512 uc, upstreamName, err := dc.unmarshalChannel(name)
[46]513 if err != nil {
514 return err
515 }
516
517 if modeStr != "" {
[69]518 uc.SendMessage(&irc.Message{
519 Command: "MODE",
520 Params: []string{upstreamName, modeStr},
521 })
[46]522 } else {
[69]523 ch, ok := uc.channels[upstreamName]
524 if !ok {
525 return ircError{&irc.Message{
526 Command: irc.ERR_NOSUCHCHANNEL,
527 Params: []string{name, "No such channel"},
528 }}
529 }
530
[55]531 dc.SendMessage(&irc.Message{
532 Prefix: dc.srv.prefix(),
[46]533 Command: irc.RPL_CHANNELMODEIS,
[69]534 Params: []string{name, string(ch.modes)},
[54]535 })
[46]536 }
537 } else {
[55]538 if name != dc.nick {
[46]539 return ircError{&irc.Message{
540 Command: irc.ERR_USERSDONTMATCH,
[55]541 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]542 }}
543 }
544
545 if modeStr != "" {
[73]546 dc.forEachUpstream(func(uc *upstreamConn) {
[69]547 uc.SendMessage(&irc.Message{
548 Command: "MODE",
549 Params: []string{uc.nick, modeStr},
550 })
[46]551 })
552 } else {
[55]553 dc.SendMessage(&irc.Message{
554 Prefix: dc.srv.prefix(),
[46]555 Command: irc.RPL_UMODEIS,
556 Params: []string{""}, // TODO
[54]557 })
[46]558 }
559 }
[58]560 case "PRIVMSG":
561 var targetsStr, text string
562 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
563 return err
564 }
565
566 for _, name := range strings.Split(targetsStr, ",") {
[69]567 uc, upstreamName, err := dc.unmarshalChannel(name)
[58]568 if err != nil {
569 return err
570 }
571
[69]572 uc.SendMessage(&irc.Message{
[58]573 Command: "PRIVMSG",
[69]574 Params: []string{upstreamName, text},
[60]575 })
[58]576 }
[13]577 default:
[55]578 dc.logger.Printf("unhandled message: %v", msg)
[13]579 return newUnknownCommandError(msg.Command)
580 }
[42]581 return nil
[13]582}
Note: See TracBrowser for help on using the repository browser.