source: code/trunk/downstream.go@ 68

Last change on this file since 68 was 68, checked in by contact, 5 years ago

Fix PING handlers, again

File size: 8.9 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
[57]43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
[22]50
[13]51 registered bool
[37]52 user *user
[13]53 nick string
54 username string
55 realname string
56}
57
[22]58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]59 dc := &downstreamConn{
[57]60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
[22]67 }
[26]68
69 go func() {
[56]70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
[26]72 }
[55]73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
[45]75 } else {
[55]76 dc.logger.Printf("connection closed")
[45]77 }
[26]78 }()
79
[55]80 return dc
[22]81}
82
[55]83func (dc *downstreamConn) prefix() *irc.Prefix {
[27]84 return &irc.Prefix{
[55]85 Name: dc.nick,
86 User: dc.username,
[27]87 // TODO: fill the host?
88 }
89}
90
[57]91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
[55]100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
[22]102
103 for {
[55]104 msg, err := dc.irc.ReadMessage()
[22]105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
[64]111 if dc.srv.Debug {
112 dc.logger.Printf("received: %v", msg)
113 }
114
[55]115 err = dc.handleMessage(msg)
[22]116 if ircErr, ok := err.(ircError); ok {
[55]117 ircErr.Message.Prefix = dc.srv.prefix()
118 dc.SendMessage(ircErr.Message)
[22]119 } else if err != nil {
120 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
121 }
122
[57]123 if dc.isClosed() {
[22]124 return nil
125 }
126 }
127
[45]128 return nil
[22]129}
130
[56]131func (dc *downstreamConn) writeMessages() error {
[57]132 for {
133 var err error
134 var closed bool
135 select {
136 case msg := <-dc.messages:
[64]137 if dc.srv.Debug {
138 dc.logger.Printf("sent: %v", msg)
139 }
[57]140 err = dc.irc.WriteMessage(msg)
141 case consumer := <-dc.consumers:
142 for {
143 msg := consumer.Peek()
144 if msg == nil {
145 break
146 }
[64]147 if dc.srv.Debug {
148 dc.logger.Printf("sent: %v", msg)
149 }
[57]150 err = dc.irc.WriteMessage(msg)
151 if err != nil {
152 break
153 }
154 consumer.Consume()
155 }
156 case <-dc.closed:
157 closed = true
158 }
159 if err != nil {
[56]160 return err
161 }
[57]162 if closed {
163 break
164 }
[56]165 }
166 return nil
167}
168
[55]169func (dc *downstreamConn) Close() error {
[57]170 if dc.isClosed() {
[26]171 return fmt.Errorf("downstream connection already closed")
172 }
[40]173
[55]174 if u := dc.user; u != nil {
[40]175 u.lock.Lock()
176 for i := range u.downstreamConns {
[55]177 if u.downstreamConns[i] == dc {
[40]178 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
[63]179 break
[40]180 }
181 }
182 u.lock.Unlock()
[13]183 }
[40]184
[57]185 close(dc.closed)
[45]186 return nil
[13]187}
188
[55]189func (dc *downstreamConn) SendMessage(msg *irc.Message) {
190 dc.messages <- msg
[54]191}
192
[55]193func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]194 switch msg.Command {
[28]195 case "QUIT":
[55]196 return dc.Close()
[13]197 case "PING":
[55]198 dc.SendMessage(&irc.Message{
199 Prefix: dc.srv.prefix(),
[13]200 Command: "PONG",
[68]201 Params: msg.Params,
[54]202 })
[26]203 return nil
[13]204 default:
[55]205 if dc.registered {
206 return dc.handleMessageRegistered(msg)
[13]207 } else {
[55]208 return dc.handleMessageUnregistered(msg)
[13]209 }
210 }
211}
212
[55]213func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]214 switch msg.Command {
215 case "NICK":
[55]216 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]217 return err
[13]218 }
219 case "USER":
[43]220 var username string
[55]221 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]222 return err
[13]223 }
[55]224 dc.username = "~" + username
[13]225 default:
[55]226 dc.logger.Printf("unhandled message: %v", msg)
[13]227 return newUnknownCommandError(msg.Command)
228 }
[55]229 if dc.username != "" && dc.nick != "" {
230 return dc.register()
[13]231 }
232 return nil
233}
234
[55]235func (dc *downstreamConn) register() error {
236 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
[38]237 if u == nil {
[55]238 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
239 dc.SendMessage(&irc.Message{
240 Prefix: dc.srv.prefix(),
[37]241 Command: irc.ERR_PASSWDMISMATCH,
242 Params: []string{"*", "Invalid username or password"},
[54]243 })
[37]244 return nil
245 }
246
[55]247 dc.registered = true
248 dc.user = u
[13]249
[40]250 u.lock.Lock()
[57]251 firstDownstream := len(u.downstreamConns) == 0
[55]252 u.downstreamConns = append(u.downstreamConns, dc)
[40]253 u.lock.Unlock()
254
[55]255 dc.SendMessage(&irc.Message{
256 Prefix: dc.srv.prefix(),
[13]257 Command: irc.RPL_WELCOME,
[55]258 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]259 })
[55]260 dc.SendMessage(&irc.Message{
261 Prefix: dc.srv.prefix(),
[13]262 Command: irc.RPL_YOURHOST,
[55]263 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]264 })
[55]265 dc.SendMessage(&irc.Message{
266 Prefix: dc.srv.prefix(),
[13]267 Command: irc.RPL_CREATED,
[55]268 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]269 })
[55]270 dc.SendMessage(&irc.Message{
271 Prefix: dc.srv.prefix(),
[13]272 Command: irc.RPL_MYINFO,
[55]273 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]274 })
[55]275 dc.SendMessage(&irc.Message{
276 Prefix: dc.srv.prefix(),
[13]277 Command: irc.ERR_NOMOTD,
[55]278 Params: []string{dc.nick, "No MOTD"},
[54]279 })
[13]280
[39]281 u.forEachUpstream(func(uc *upstreamConn) {
[30]282 // TODO: fix races accessing upstream connection data
283 for _, ch := range uc.channels {
284 if ch.complete {
[55]285 forwardChannel(dc, ch)
[30]286 }
287 }
[50]288
[51]289 // TODO: let clients specify the ring buffer name in their username
[57]290 historyName := ""
291
292 var seqPtr *uint64
293 if firstDownstream {
294 seq, ok := uc.history[historyName]
295 if ok {
296 seqPtr = &seq
[50]297 }
298 }
[57]299
[59]300 consumer, ch := uc.ring.NewConsumer(seqPtr)
[57]301 go func() {
302 for {
303 var closed bool
304 select {
305 case <-ch:
306 dc.consumers <- consumer
307 case <-dc.closed:
308 closed = true
309 }
310 if closed {
311 break
312 }
313 }
314
315 seq := consumer.Close()
316
317 dc.user.lock.Lock()
318 lastDownstream := len(dc.user.downstreamConns) == 0
319 dc.user.lock.Unlock()
320
321 if lastDownstream {
322 uc.history[historyName] = seq
323 }
324 }()
[39]325 })
[50]326
[13]327 return nil
328}
329
[55]330func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]331 switch msg.Command {
[42]332 case "USER":
[13]333 return ircError{&irc.Message{
334 Command: irc.ERR_ALREADYREGISTERED,
[55]335 Params: []string{dc.nick, "You may not reregister"},
[13]336 }}
[42]337 case "NICK":
[55]338 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]339 uc.SendMessage(msg)
[42]340 })
[48]341 case "JOIN":
342 var name string
343 if err := parseMessageParams(msg, &name); err != nil {
344 return err
345 }
346
[55]347 if ch, _ := dc.user.getChannel(name); ch != nil {
[48]348 break // already joined
349 }
350
351 // TODO: extract network name from channel name
352 return ircError{&irc.Message{
353 Command: irc.ERR_NOSUCHCHANNEL,
354 Params: []string{name, "Channel name ambiguous"},
355 }}
[49]356 case "PART":
357 var name string
358 if err := parseMessageParams(msg, &name); err != nil {
359 return err
360 }
361
[55]362 ch, err := dc.user.getChannel(name)
[49]363 if err != nil {
364 return err
365 }
366
[60]367 ch.conn.SendMessage(msg)
[49]368 // TODO: remove channel from upstream config
[46]369 case "MODE":
370 var name string
371 if err := parseMessageParams(msg, &name); err != nil {
372 return err
373 }
374
375 var modeStr string
376 if len(msg.Params) > 1 {
377 modeStr = msg.Params[1]
378 }
379
380 if msg.Prefix.Name != name {
[55]381 ch, err := dc.user.getChannel(name)
[46]382 if err != nil {
383 return err
384 }
385
386 if modeStr != "" {
[60]387 ch.conn.SendMessage(msg)
[46]388 } else {
[55]389 dc.SendMessage(&irc.Message{
390 Prefix: dc.srv.prefix(),
[46]391 Command: irc.RPL_CHANNELMODEIS,
392 Params: []string{ch.Name, string(ch.modes)},
[54]393 })
[46]394 }
395 } else {
[55]396 if name != dc.nick {
[46]397 return ircError{&irc.Message{
398 Command: irc.ERR_USERSDONTMATCH,
[55]399 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]400 }}
401 }
402
403 if modeStr != "" {
[55]404 dc.user.forEachUpstream(func(uc *upstreamConn) {
[60]405 uc.SendMessage(msg)
[46]406 })
407 } else {
[55]408 dc.SendMessage(&irc.Message{
409 Prefix: dc.srv.prefix(),
[46]410 Command: irc.RPL_UMODEIS,
411 Params: []string{""}, // TODO
[54]412 })
[46]413 }
414 }
[58]415 case "PRIVMSG":
416 var targetsStr, text string
417 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
418 return err
419 }
420
421 for _, name := range strings.Split(targetsStr, ",") {
422 ch, err := dc.user.getChannel(name)
423 if err != nil {
424 return err
425 }
426
[60]427 ch.conn.SendMessage(&irc.Message{
[58]428 Prefix: msg.Prefix,
429 Command: "PRIVMSG",
430 Params: []string{name, text},
[60]431 })
[58]432 }
[13]433 default:
[55]434 dc.logger.Printf("unhandled message: %v", msg)
[13]435 return newUnknownCommandError(msg.Command)
436 }
[42]437 return nil
[13]438}
Note: See TracBrowser for help on using the repository browser.