source: code/trunk/downstream.go@ 69

Last change on this file since 69 was 69, checked in by contact, 5 years ago

Add functions to translate between upstream and downstream names

File size: 10.3 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
[69]42type consumption struct {
43 consumer *RingConsumer
44 upstreamConn *upstreamConn
45}
46
[13]47type downstreamConn struct {
[69]48 net net.Conn
49 irc *irc.Conn
50 srv *Server
51 logger Logger
52 messages chan *irc.Message
53 consumptions chan consumption
54 closed chan struct{}
[22]55
[13]56 registered bool
[37]57 user *user
[13]58 nick string
59 username string
60 realname string
61}
62
[22]63func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]64 dc := &downstreamConn{
[69]65 net: netConn,
66 irc: irc.NewConn(netConn),
67 srv: srv,
68 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
69 messages: make(chan *irc.Message, 64),
70 consumptions: make(chan consumption),
71 closed: make(chan struct{}),
[22]72 }
[26]73
74 go func() {
[56]75 if err := dc.writeMessages(); err != nil {
76 dc.logger.Printf("failed to write message: %v", err)
[26]77 }
[55]78 if err := dc.net.Close(); err != nil {
79 dc.logger.Printf("failed to close connection: %v", err)
[45]80 } else {
[55]81 dc.logger.Printf("connection closed")
[45]82 }
[26]83 }()
84
[55]85 return dc
[22]86}
87
[55]88func (dc *downstreamConn) prefix() *irc.Prefix {
[27]89 return &irc.Prefix{
[55]90 Name: dc.nick,
91 User: dc.username,
[27]92 // TODO: fill the host?
93 }
94}
95
[69]96func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
97 return name
98}
99
100func (dc *downstreamConn) unmarshalChannel(name string) (*upstreamConn, string, error) {
101 // TODO: extract network name from channel name
102 ch, err := dc.user.getChannel(name)
103 if err != nil {
104 return nil, "", err
105 }
106 return ch.conn, ch.Name, nil
107}
108
109func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
110 if nick == uc.nick {
111 return dc.nick
112 }
113 return nick
114}
115
116func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
117 if prefix.Name == uc.nick {
118 return dc.prefix()
119 }
120 return prefix
121}
122
[57]123func (dc *downstreamConn) isClosed() bool {
124 select {
125 case <-dc.closed:
126 return true
127 default:
128 return false
129 }
130}
131
[55]132func (dc *downstreamConn) readMessages() error {
133 dc.logger.Printf("new connection")
[22]134
135 for {
[55]136 msg, err := dc.irc.ReadMessage()
[22]137 if err == io.EOF {
138 break
139 } else if err != nil {
140 return fmt.Errorf("failed to read IRC command: %v", err)
141 }
142
[64]143 if dc.srv.Debug {
144 dc.logger.Printf("received: %v", msg)
145 }
146
[55]147 err = dc.handleMessage(msg)
[22]148 if ircErr, ok := err.(ircError); ok {
[55]149 ircErr.Message.Prefix = dc.srv.prefix()
150 dc.SendMessage(ircErr.Message)
[22]151 } else if err != nil {
152 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
153 }
154
[57]155 if dc.isClosed() {
[22]156 return nil
157 }
158 }
159
[45]160 return nil
[22]161}
162
[56]163func (dc *downstreamConn) writeMessages() error {
[57]164 for {
165 var err error
166 var closed bool
167 select {
168 case msg := <-dc.messages:
[64]169 if dc.srv.Debug {
170 dc.logger.Printf("sent: %v", msg)
171 }
[57]172 err = dc.irc.WriteMessage(msg)
[69]173 case consumption := <-dc.consumptions:
174 consumer, uc := consumption.consumer, consumption.upstreamConn
[57]175 for {
176 msg := consumer.Peek()
177 if msg == nil {
178 break
179 }
[69]180 msg = msg.Copy()
181 switch msg.Command {
182 case "PRIVMSG":
183 // TODO: detect whether it's a user or a channel
184 msg.Params[0] = dc.marshalChannel(uc, msg.Params[0])
185 default:
186 panic("expected to consume a PRIVMSG message")
187 }
[64]188 if dc.srv.Debug {
189 dc.logger.Printf("sent: %v", msg)
190 }
[57]191 err = dc.irc.WriteMessage(msg)
192 if err != nil {
193 break
194 }
195 consumer.Consume()
196 }
197 case <-dc.closed:
198 closed = true
199 }
200 if err != nil {
[56]201 return err
202 }
[57]203 if closed {
204 break
205 }
[56]206 }
207 return nil
208}
209
[55]210func (dc *downstreamConn) Close() error {
[57]211 if dc.isClosed() {
[26]212 return fmt.Errorf("downstream connection already closed")
213 }
[40]214
[55]215 if u := dc.user; u != nil {
[40]216 u.lock.Lock()
217 for i := range u.downstreamConns {
[55]218 if u.downstreamConns[i] == dc {
[40]219 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]220 break
[40]221 }
222 }
223 u.lock.Unlock()
[13]224 }
[40]225
[57]226 close(dc.closed)
[45]227 return nil
[13]228}
229
[55]230func (dc *downstreamConn) SendMessage(msg *irc.Message) {
231 dc.messages <- msg
[54]232}
233
[55]234func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]235 switch msg.Command {
[28]236 case "QUIT":
[55]237 return dc.Close()
[13]238 case "PING":
[55]239 dc.SendMessage(&irc.Message{
240 Prefix: dc.srv.prefix(),
[13]241 Command: "PONG",
[68]242 Params: msg.Params,
[54]243 })
[26]244 return nil
[13]245 default:
[55]246 if dc.registered {
247 return dc.handleMessageRegistered(msg)
[13]248 } else {
[55]249 return dc.handleMessageUnregistered(msg)
[13]250 }
251 }
252}
253
[55]254func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]255 switch msg.Command {
256 case "NICK":
[55]257 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]258 return err
[13]259 }
260 case "USER":
[43]261 var username string
[55]262 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]263 return err
[13]264 }
[55]265 dc.username = "~" + username
[13]266 default:
[55]267 dc.logger.Printf("unhandled message: %v", msg)
[13]268 return newUnknownCommandError(msg.Command)
269 }
[55]270 if dc.username != "" && dc.nick != "" {
271 return dc.register()
[13]272 }
273 return nil
274}
275
[55]276func (dc *downstreamConn) register() error {
277 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
[38]278 if u == nil {
[55]279 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
280 dc.SendMessage(&irc.Message{
281 Prefix: dc.srv.prefix(),
[37]282 Command: irc.ERR_PASSWDMISMATCH,
283 Params: []string{"*", "Invalid username or password"},
[54]284 })
[37]285 return nil
286 }
287
[55]288 dc.registered = true
289 dc.user = u
[13]290
[40]291 u.lock.Lock()
[57]292 firstDownstream := len(u.downstreamConns) == 0
[55]293 u.downstreamConns = append(u.downstreamConns, dc)
[40]294 u.lock.Unlock()
295
[55]296 dc.SendMessage(&irc.Message{
297 Prefix: dc.srv.prefix(),
[13]298 Command: irc.RPL_WELCOME,
[55]299 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]300 })
[55]301 dc.SendMessage(&irc.Message{
302 Prefix: dc.srv.prefix(),
[13]303 Command: irc.RPL_YOURHOST,
[55]304 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]305 })
[55]306 dc.SendMessage(&irc.Message{
307 Prefix: dc.srv.prefix(),
[13]308 Command: irc.RPL_CREATED,
[55]309 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]310 })
[55]311 dc.SendMessage(&irc.Message{
312 Prefix: dc.srv.prefix(),
[13]313 Command: irc.RPL_MYINFO,
[55]314 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]315 })
[55]316 dc.SendMessage(&irc.Message{
317 Prefix: dc.srv.prefix(),
[13]318 Command: irc.ERR_NOMOTD,
[55]319 Params: []string{dc.nick, "No MOTD"},
[54]320 })
[13]321
[39]322 u.forEachUpstream(func(uc *upstreamConn) {
[30]323 // TODO: fix races accessing upstream connection data
324 for _, ch := range uc.channels {
325 if ch.complete {
[55]326 forwardChannel(dc, ch)
[30]327 }
328 }
[50]329
[51]330 // TODO: let clients specify the ring buffer name in their username
[57]331 historyName := ""
332
333 var seqPtr *uint64
334 if firstDownstream {
335 seq, ok := uc.history[historyName]
336 if ok {
337 seqPtr = &seq
[50]338 }
339 }
[57]340
[59]341 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]342 go func() {
343 for {
344 var closed bool
345 select {
346 case <-ch:
[69]347 dc.consumptions <- consumption{consumer, uc}
[57]348 case <-dc.closed:
349 closed = true
350 }
351 if closed {
352 break
353 }
354 }
355
356 seq := consumer.Close()
357
358 dc.user.lock.Lock()
359 lastDownstream := len(dc.user.downstreamConns) == 0
360 dc.user.lock.Unlock()
361
362 if lastDownstream {
363 uc.history[historyName] = seq
364 }
365 }()
[39]366 })
[50]367
[13]368 return nil
369}
370
[55]371func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]372 switch msg.Command {
[42]373 case "USER":
[13]374 return ircError{&irc.Message{
375 Command: irc.ERR_ALREADYREGISTERED,
[55]376 Params: []string{dc.nick, "You may not reregister"},
[13]377 }}
[42]378 case "NICK":
[55]379 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]380 uc.SendMessage(msg)
[42]381 })
[69]382 case "JOIN", "PART":
[48]383 var name string
384 if err := parseMessageParams(msg, &name); err != nil {
385 return err
386 }
387
[69]388 uc, upstreamName, err := dc.unmarshalChannel(name)
389 if err != nil {
390 return ircError{&irc.Message{
391 Command: irc.ERR_NOSUCHCHANNEL,
392 Params: []string{name, err.Error()},
393 }}
[48]394 }
395
[69]396 uc.SendMessage(&irc.Message{
397 Command: msg.Command,
398 Params: []string{upstreamName},
399 })
400 // TODO: add/remove channel from upstream config
401 case "MODE":
402 if msg.Prefix == nil {
403 return fmt.Errorf("missing prefix")
[49]404 }
405
[46]406 var name string
407 if err := parseMessageParams(msg, &name); err != nil {
408 return err
409 }
410
411 var modeStr string
412 if len(msg.Params) > 1 {
413 modeStr = msg.Params[1]
414 }
415
416 if msg.Prefix.Name != name {
[69]417 uc, upstreamName, err := dc.unmarshalChannel(name)
[46]418 if err != nil {
419 return err
420 }
421
422 if modeStr != "" {
[69]423 uc.SendMessage(&irc.Message{
424 Prefix: uc.prefix(),
425 Command: "MODE",
426 Params: []string{upstreamName, modeStr},
427 })
[46]428 } else {
[69]429 ch, ok := uc.channels[upstreamName]
430 if !ok {
431 return ircError{&irc.Message{
432 Command: irc.ERR_NOSUCHCHANNEL,
433 Params: []string{name, "No such channel"},
434 }}
435 }
436
[55]437 dc.SendMessage(&irc.Message{
438 Prefix: dc.srv.prefix(),
[46]439 Command: irc.RPL_CHANNELMODEIS,
[69]440 Params: []string{name, string(ch.modes)},
[54]441 })
[46]442 }
443 } else {
[55]444 if name != dc.nick {
[46]445 return ircError{&irc.Message{
446 Command: irc.ERR_USERSDONTMATCH,
[55]447 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]448 }}
449 }
450
451 if modeStr != "" {
[55]452 dc.user.forEachUpstream(func(uc *upstreamConn) {
[69]453 uc.SendMessage(&irc.Message{
454 Prefix: uc.prefix(),
455 Command: "MODE",
456 Params: []string{uc.nick, modeStr},
457 })
[46]458 })
459 } else {
[55]460 dc.SendMessage(&irc.Message{
461 Prefix: dc.srv.prefix(),
[46]462 Command: irc.RPL_UMODEIS,
463 Params: []string{""}, // TODO
[54]464 })
[46]465 }
466 }
[58]467 case "PRIVMSG":
468 var targetsStr, text string
469 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
470 return err
471 }
472
473 for _, name := range strings.Split(targetsStr, ",") {
[69]474 uc, upstreamName, err := dc.unmarshalChannel(name)
[58]475 if err != nil {
476 return err
477 }
478
[69]479 uc.SendMessage(&irc.Message{
480 Prefix: uc.prefix(),
[58]481 Command: "PRIVMSG",
[69]482 Params: []string{upstreamName, text},
[60]483 })
[58]484 }
[13]485 default:
[55]486 dc.logger.Printf("unhandled message: %v", msg)
[13]487 return newUnknownCommandError(msg.Command)
488 }
[42]489 return nil
[13]490}
Note: See TracBrowser for help on using the repository browser.