source: code/trunk/downstream.go@ 76

Last change on this file since 76 was 76, checked in by contact, 5 years ago

Rename network to upstreamName

File size: 11.4 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
[69]42type consumption struct {
43 consumer *RingConsumer
44 upstreamConn *upstreamConn
45}
46
[13]47type downstreamConn struct {
[69]48 net net.Conn
49 irc *irc.Conn
50 srv *Server
51 logger Logger
52 messages chan *irc.Message
53 consumptions chan consumption
54 closed chan struct{}
[22]55
[13]56 registered bool
[37]57 user *user
[13]58 nick string
59 username string
60 realname string
[73]61 upstream *Upstream
[13]62}
63
[22]64func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]65 dc := &downstreamConn{
[69]66 net: netConn,
67 irc: irc.NewConn(netConn),
68 srv: srv,
69 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
70 messages: make(chan *irc.Message, 64),
71 consumptions: make(chan consumption),
72 closed: make(chan struct{}),
[22]73 }
[26]74
75 go func() {
[56]76 if err := dc.writeMessages(); err != nil {
77 dc.logger.Printf("failed to write message: %v", err)
[26]78 }
[55]79 if err := dc.net.Close(); err != nil {
80 dc.logger.Printf("failed to close connection: %v", err)
[45]81 } else {
[55]82 dc.logger.Printf("connection closed")
[45]83 }
[26]84 }()
85
[55]86 return dc
[22]87}
88
[55]89func (dc *downstreamConn) prefix() *irc.Prefix {
[27]90 return &irc.Prefix{
[55]91 Name: dc.nick,
92 User: dc.username,
[27]93 // TODO: fill the host?
94 }
95}
96
[69]97func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
98 return name
99}
100
[73]101func (dc *downstreamConn) forEachUpstream(f func(*upstreamConn)) {
102 dc.user.forEachUpstream(func(uc *upstreamConn) {
103 if dc.upstream != nil && uc.upstream != dc.upstream {
104 return
105 }
106 f(uc)
107 })
108}
109
[69]110func (dc *downstreamConn) unmarshalChannel(name string) (*upstreamConn, string, error) {
[73]111 // TODO: extract network name from channel name if dc.upstream == nil
112 var channel *upstreamChannel
113 var err error
114 dc.forEachUpstream(func(uc *upstreamConn) {
115 if err != nil {
116 return
117 }
118 if ch, ok := uc.channels[name]; ok {
119 if channel != nil {
120 err = fmt.Errorf("ambiguous channel name %q", name)
121 } else {
122 channel = ch
123 }
124 }
125 })
126 if channel == nil {
127 return nil, "", ircError{&irc.Message{
128 Command: irc.ERR_NOSUCHCHANNEL,
129 Params: []string{name, "No such channel"},
130 }}
[69]131 }
[73]132 return channel.conn, channel.Name, nil
[69]133}
134
135func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
136 if nick == uc.nick {
137 return dc.nick
138 }
139 return nick
140}
141
142func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
143 if prefix.Name == uc.nick {
144 return dc.prefix()
145 }
146 return prefix
147}
148
[57]149func (dc *downstreamConn) isClosed() bool {
150 select {
151 case <-dc.closed:
152 return true
153 default:
154 return false
155 }
156}
157
[55]158func (dc *downstreamConn) readMessages() error {
159 dc.logger.Printf("new connection")
[22]160
161 for {
[55]162 msg, err := dc.irc.ReadMessage()
[22]163 if err == io.EOF {
164 break
165 } else if err != nil {
166 return fmt.Errorf("failed to read IRC command: %v", err)
167 }
168
[64]169 if dc.srv.Debug {
170 dc.logger.Printf("received: %v", msg)
171 }
172
[55]173 err = dc.handleMessage(msg)
[22]174 if ircErr, ok := err.(ircError); ok {
[55]175 ircErr.Message.Prefix = dc.srv.prefix()
176 dc.SendMessage(ircErr.Message)
[22]177 } else if err != nil {
178 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
179 }
180
[57]181 if dc.isClosed() {
[22]182 return nil
183 }
184 }
185
[45]186 return nil
[22]187}
188
[56]189func (dc *downstreamConn) writeMessages() error {
[57]190 for {
191 var err error
192 var closed bool
193 select {
194 case msg := <-dc.messages:
[64]195 if dc.srv.Debug {
196 dc.logger.Printf("sent: %v", msg)
197 }
[57]198 err = dc.irc.WriteMessage(msg)
[69]199 case consumption := <-dc.consumptions:
200 consumer, uc := consumption.consumer, consumption.upstreamConn
[57]201 for {
202 msg := consumer.Peek()
203 if msg == nil {
204 break
205 }
[69]206 msg = msg.Copy()
207 switch msg.Command {
208 case "PRIVMSG":
209 // TODO: detect whether it's a user or a channel
210 msg.Params[0] = dc.marshalChannel(uc, msg.Params[0])
211 default:
212 panic("expected to consume a PRIVMSG message")
213 }
[64]214 if dc.srv.Debug {
215 dc.logger.Printf("sent: %v", msg)
216 }
[57]217 err = dc.irc.WriteMessage(msg)
218 if err != nil {
219 break
220 }
221 consumer.Consume()
222 }
223 case <-dc.closed:
224 closed = true
225 }
226 if err != nil {
[56]227 return err
228 }
[57]229 if closed {
230 break
231 }
[56]232 }
233 return nil
234}
235
[55]236func (dc *downstreamConn) Close() error {
[57]237 if dc.isClosed() {
[26]238 return fmt.Errorf("downstream connection already closed")
239 }
[40]240
[55]241 if u := dc.user; u != nil {
[40]242 u.lock.Lock()
243 for i := range u.downstreamConns {
[55]244 if u.downstreamConns[i] == dc {
[40]245 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]246 break
[40]247 }
248 }
249 u.lock.Unlock()
[13]250 }
[40]251
[57]252 close(dc.closed)
[45]253 return nil
[13]254}
255
[55]256func (dc *downstreamConn) SendMessage(msg *irc.Message) {
257 dc.messages <- msg
[54]258}
259
[55]260func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]261 switch msg.Command {
[28]262 case "QUIT":
[55]263 return dc.Close()
[13]264 case "PING":
[55]265 dc.SendMessage(&irc.Message{
266 Prefix: dc.srv.prefix(),
[13]267 Command: "PONG",
[68]268 Params: msg.Params,
[54]269 })
[26]270 return nil
[13]271 default:
[55]272 if dc.registered {
273 return dc.handleMessageRegistered(msg)
[13]274 } else {
[55]275 return dc.handleMessageUnregistered(msg)
[13]276 }
277 }
278}
279
[55]280func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]281 switch msg.Command {
282 case "NICK":
[55]283 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]284 return err
[13]285 }
286 case "USER":
[43]287 var username string
[55]288 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]289 return err
[13]290 }
[55]291 dc.username = "~" + username
[13]292 default:
[55]293 dc.logger.Printf("unhandled message: %v", msg)
[13]294 return newUnknownCommandError(msg.Command)
295 }
[55]296 if dc.username != "" && dc.nick != "" {
297 return dc.register()
[13]298 }
299 return nil
300}
301
[55]302func (dc *downstreamConn) register() error {
[73]303 username := strings.TrimPrefix(dc.username, "~")
[76]304 var upstreamName string
[73]305 if i := strings.LastIndexAny(username, "/@"); i >= 0 {
[76]306 upstreamName = username[i+1:]
[73]307 }
308 if i := strings.IndexAny(username, "/@"); i >= 0 {
309 username = username[:i]
310 }
311
312 u := dc.srv.getUser(username)
[38]313 if u == nil {
[73]314 dc.logger.Printf("failed authentication: unknown username %q", username)
[55]315 dc.SendMessage(&irc.Message{
316 Prefix: dc.srv.prefix(),
[37]317 Command: irc.ERR_PASSWDMISMATCH,
318 Params: []string{"*", "Invalid username or password"},
[54]319 })
[37]320 return nil
321 }
322
[76]323 if upstreamName != "" {
324 dc.upstream = dc.user.getUpstream(upstreamName)
[73]325 if dc.upstream == nil {
[76]326 dc.logger.Printf("failed registration: unknown upstream %q", upstreamName)
[73]327 dc.SendMessage(&irc.Message{
328 Prefix: dc.srv.prefix(),
329 Command: irc.ERR_PASSWDMISMATCH,
[76]330 Params: []string{"*", fmt.Sprintf("Unknown upstream server %q", upstreamName)},
[73]331 })
332 return nil
333 }
334 }
335
[55]336 dc.registered = true
337 dc.user = u
[13]338
[40]339 u.lock.Lock()
[57]340 firstDownstream := len(u.downstreamConns) == 0
[55]341 u.downstreamConns = append(u.downstreamConns, dc)
[40]342 u.lock.Unlock()
343
[55]344 dc.SendMessage(&irc.Message{
345 Prefix: dc.srv.prefix(),
[13]346 Command: irc.RPL_WELCOME,
[55]347 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]348 })
[55]349 dc.SendMessage(&irc.Message{
350 Prefix: dc.srv.prefix(),
[13]351 Command: irc.RPL_YOURHOST,
[55]352 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]353 })
[55]354 dc.SendMessage(&irc.Message{
355 Prefix: dc.srv.prefix(),
[13]356 Command: irc.RPL_CREATED,
[55]357 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]358 })
[55]359 dc.SendMessage(&irc.Message{
360 Prefix: dc.srv.prefix(),
[13]361 Command: irc.RPL_MYINFO,
[55]362 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]363 })
[55]364 dc.SendMessage(&irc.Message{
365 Prefix: dc.srv.prefix(),
[13]366 Command: irc.ERR_NOMOTD,
[55]367 Params: []string{dc.nick, "No MOTD"},
[54]368 })
[13]369
[73]370 dc.forEachUpstream(func(uc *upstreamConn) {
[30]371 // TODO: fix races accessing upstream connection data
372 for _, ch := range uc.channels {
373 if ch.complete {
[55]374 forwardChannel(dc, ch)
[30]375 }
376 }
[50]377
[73]378 historyName := dc.username
[57]379
380 var seqPtr *uint64
381 if firstDownstream {
382 seq, ok := uc.history[historyName]
383 if ok {
384 seqPtr = &seq
[50]385 }
386 }
[57]387
[59]388 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]389 go func() {
390 for {
391 var closed bool
392 select {
393 case <-ch:
[69]394 dc.consumptions <- consumption{consumer, uc}
[57]395 case <-dc.closed:
396 closed = true
397 }
398 if closed {
399 break
400 }
401 }
402
403 seq := consumer.Close()
404
405 dc.user.lock.Lock()
406 lastDownstream := len(dc.user.downstreamConns) == 0
407 dc.user.lock.Unlock()
408
409 if lastDownstream {
410 uc.history[historyName] = seq
411 }
412 }()
[39]413 })
[50]414
[13]415 return nil
416}
417
[55]418func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]419 switch msg.Command {
[42]420 case "USER":
[13]421 return ircError{&irc.Message{
422 Command: irc.ERR_ALREADYREGISTERED,
[55]423 Params: []string{dc.nick, "You may not reregister"},
[13]424 }}
[42]425 case "NICK":
[73]426 dc.forEachUpstream(func(uc *upstreamConn) {
[60]427 uc.SendMessage(msg)
[42]428 })
[69]429 case "JOIN", "PART":
[48]430 var name string
431 if err := parseMessageParams(msg, &name); err != nil {
432 return err
433 }
434
[69]435 uc, upstreamName, err := dc.unmarshalChannel(name)
436 if err != nil {
437 return ircError{&irc.Message{
438 Command: irc.ERR_NOSUCHCHANNEL,
439 Params: []string{name, err.Error()},
440 }}
[48]441 }
442
[69]443 uc.SendMessage(&irc.Message{
444 Command: msg.Command,
445 Params: []string{upstreamName},
446 })
447 // TODO: add/remove channel from upstream config
448 case "MODE":
449 if msg.Prefix == nil {
450 return fmt.Errorf("missing prefix")
[49]451 }
452
[46]453 var name string
454 if err := parseMessageParams(msg, &name); err != nil {
455 return err
456 }
457
458 var modeStr string
459 if len(msg.Params) > 1 {
460 modeStr = msg.Params[1]
461 }
462
463 if msg.Prefix.Name != name {
[69]464 uc, upstreamName, err := dc.unmarshalChannel(name)
[46]465 if err != nil {
466 return err
467 }
468
469 if modeStr != "" {
[69]470 uc.SendMessage(&irc.Message{
471 Command: "MODE",
472 Params: []string{upstreamName, modeStr},
473 })
[46]474 } else {
[69]475 ch, ok := uc.channels[upstreamName]
476 if !ok {
477 return ircError{&irc.Message{
478 Command: irc.ERR_NOSUCHCHANNEL,
479 Params: []string{name, "No such channel"},
480 }}
481 }
482
[55]483 dc.SendMessage(&irc.Message{
484 Prefix: dc.srv.prefix(),
[46]485 Command: irc.RPL_CHANNELMODEIS,
[69]486 Params: []string{name, string(ch.modes)},
[54]487 })
[46]488 }
489 } else {
[55]490 if name != dc.nick {
[46]491 return ircError{&irc.Message{
492 Command: irc.ERR_USERSDONTMATCH,
[55]493 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]494 }}
495 }
496
497 if modeStr != "" {
[73]498 dc.forEachUpstream(func(uc *upstreamConn) {
[69]499 uc.SendMessage(&irc.Message{
500 Command: "MODE",
501 Params: []string{uc.nick, modeStr},
502 })
[46]503 })
504 } else {
[55]505 dc.SendMessage(&irc.Message{
506 Prefix: dc.srv.prefix(),
[46]507 Command: irc.RPL_UMODEIS,
508 Params: []string{""}, // TODO
[54]509 })
[46]510 }
511 }
[58]512 case "PRIVMSG":
513 var targetsStr, text string
514 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
515 return err
516 }
517
518 for _, name := range strings.Split(targetsStr, ",") {
[69]519 uc, upstreamName, err := dc.unmarshalChannel(name)
[58]520 if err != nil {
521 return err
522 }
523
[69]524 uc.SendMessage(&irc.Message{
[58]525 Command: "PRIVMSG",
[69]526 Params: []string{upstreamName, text},
[60]527 })
[58]528 }
[13]529 default:
[55]530 dc.logger.Printf("unhandled message: %v", msg)
[13]531 return newUnknownCommandError(msg.Command)
532 }
[42]533 return nil
[13]534}
Note: See TracBrowser for help on using the repository browser.