source: code/trunk/downstream.go@ 68

Last change on this file since 68 was 68, checked in by contact, 5 years ago

Fix PING handlers, again

File size: 8.9 KB
Line 
1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
7 "strings"
8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
50
51 registered bool
52 user *user
53 nick string
54 username string
55 realname string
56}
57
58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
59 dc := &downstreamConn{
60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
67 }
68
69 go func() {
70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
72 }
73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
75 } else {
76 dc.logger.Printf("connection closed")
77 }
78 }()
79
80 return dc
81}
82
83func (dc *downstreamConn) prefix() *irc.Prefix {
84 return &irc.Prefix{
85 Name: dc.nick,
86 User: dc.username,
87 // TODO: fill the host?
88 }
89}
90
91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
102
103 for {
104 msg, err := dc.irc.ReadMessage()
105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
111 if dc.srv.Debug {
112 dc.logger.Printf("received: %v", msg)
113 }
114
115 err = dc.handleMessage(msg)
116 if ircErr, ok := err.(ircError); ok {
117 ircErr.Message.Prefix = dc.srv.prefix()
118 dc.SendMessage(ircErr.Message)
119 } else if err != nil {
120 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
121 }
122
123 if dc.isClosed() {
124 return nil
125 }
126 }
127
128 return nil
129}
130
131func (dc *downstreamConn) writeMessages() error {
132 for {
133 var err error
134 var closed bool
135 select {
136 case msg := <-dc.messages:
137 if dc.srv.Debug {
138 dc.logger.Printf("sent: %v", msg)
139 }
140 err = dc.irc.WriteMessage(msg)
141 case consumer := <-dc.consumers:
142 for {
143 msg := consumer.Peek()
144 if msg == nil {
145 break
146 }
147 if dc.srv.Debug {
148 dc.logger.Printf("sent: %v", msg)
149 }
150 err = dc.irc.WriteMessage(msg)
151 if err != nil {
152 break
153 }
154 consumer.Consume()
155 }
156 case <-dc.closed:
157 closed = true
158 }
159 if err != nil {
160 return err
161 }
162 if closed {
163 break
164 }
165 }
166 return nil
167}
168
169func (dc *downstreamConn) Close() error {
170 if dc.isClosed() {
171 return fmt.Errorf("downstream connection already closed")
172 }
173
174 if u := dc.user; u != nil {
175 u.lock.Lock()
176 for i := range u.downstreamConns {
177 if u.downstreamConns[i] == dc {
178 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
179 break
180 }
181 }
182 u.lock.Unlock()
183 }
184
185 close(dc.closed)
186 return nil
187}
188
189func (dc *downstreamConn) SendMessage(msg *irc.Message) {
190 dc.messages <- msg
191}
192
193func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
194 switch msg.Command {
195 case "QUIT":
196 return dc.Close()
197 case "PING":
198 dc.SendMessage(&irc.Message{
199 Prefix: dc.srv.prefix(),
200 Command: "PONG",
201 Params: msg.Params,
202 })
203 return nil
204 default:
205 if dc.registered {
206 return dc.handleMessageRegistered(msg)
207 } else {
208 return dc.handleMessageUnregistered(msg)
209 }
210 }
211}
212
213func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
214 switch msg.Command {
215 case "NICK":
216 if err := parseMessageParams(msg, &dc.nick); err != nil {
217 return err
218 }
219 case "USER":
220 var username string
221 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
222 return err
223 }
224 dc.username = "~" + username
225 default:
226 dc.logger.Printf("unhandled message: %v", msg)
227 return newUnknownCommandError(msg.Command)
228 }
229 if dc.username != "" && dc.nick != "" {
230 return dc.register()
231 }
232 return nil
233}
234
235func (dc *downstreamConn) register() error {
236 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
237 if u == nil {
238 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
239 dc.SendMessage(&irc.Message{
240 Prefix: dc.srv.prefix(),
241 Command: irc.ERR_PASSWDMISMATCH,
242 Params: []string{"*", "Invalid username or password"},
243 })
244 return nil
245 }
246
247 dc.registered = true
248 dc.user = u
249
250 u.lock.Lock()
251 firstDownstream := len(u.downstreamConns) == 0
252 u.downstreamConns = append(u.downstreamConns, dc)
253 u.lock.Unlock()
254
255 dc.SendMessage(&irc.Message{
256 Prefix: dc.srv.prefix(),
257 Command: irc.RPL_WELCOME,
258 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
259 })
260 dc.SendMessage(&irc.Message{
261 Prefix: dc.srv.prefix(),
262 Command: irc.RPL_YOURHOST,
263 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
264 })
265 dc.SendMessage(&irc.Message{
266 Prefix: dc.srv.prefix(),
267 Command: irc.RPL_CREATED,
268 Params: []string{dc.nick, "Who cares when the server was created?"},
269 })
270 dc.SendMessage(&irc.Message{
271 Prefix: dc.srv.prefix(),
272 Command: irc.RPL_MYINFO,
273 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
274 })
275 dc.SendMessage(&irc.Message{
276 Prefix: dc.srv.prefix(),
277 Command: irc.ERR_NOMOTD,
278 Params: []string{dc.nick, "No MOTD"},
279 })
280
281 u.forEachUpstream(func(uc *upstreamConn) {
282 // TODO: fix races accessing upstream connection data
283 for _, ch := range uc.channels {
284 if ch.complete {
285 forwardChannel(dc, ch)
286 }
287 }
288
289 // TODO: let clients specify the ring buffer name in their username
290 historyName := ""
291
292 var seqPtr *uint64
293 if firstDownstream {
294 seq, ok := uc.history[historyName]
295 if ok {
296 seqPtr = &seq
297 }
298 }
299
300 consumer, ch := uc.ring.NewConsumer(seqPtr)
301 go func() {
302 for {
303 var closed bool
304 select {
305 case <-ch:
306 dc.consumers <- consumer
307 case <-dc.closed:
308 closed = true
309 }
310 if closed {
311 break
312 }
313 }
314
315 seq := consumer.Close()
316
317 dc.user.lock.Lock()
318 lastDownstream := len(dc.user.downstreamConns) == 0
319 dc.user.lock.Unlock()
320
321 if lastDownstream {
322 uc.history[historyName] = seq
323 }
324 }()
325 })
326
327 return nil
328}
329
330func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
331 switch msg.Command {
332 case "USER":
333 return ircError{&irc.Message{
334 Command: irc.ERR_ALREADYREGISTERED,
335 Params: []string{dc.nick, "You may not reregister"},
336 }}
337 case "NICK":
338 dc.user.forEachUpstream(func(uc *upstreamConn) {
339 uc.SendMessage(msg)
340 })
341 case "JOIN":
342 var name string
343 if err := parseMessageParams(msg, &name); err != nil {
344 return err
345 }
346
347 if ch, _ := dc.user.getChannel(name); ch != nil {
348 break // already joined
349 }
350
351 // TODO: extract network name from channel name
352 return ircError{&irc.Message{
353 Command: irc.ERR_NOSUCHCHANNEL,
354 Params: []string{name, "Channel name ambiguous"},
355 }}
356 case "PART":
357 var name string
358 if err := parseMessageParams(msg, &name); err != nil {
359 return err
360 }
361
362 ch, err := dc.user.getChannel(name)
363 if err != nil {
364 return err
365 }
366
367 ch.conn.SendMessage(msg)
368 // TODO: remove channel from upstream config
369 case "MODE":
370 var name string
371 if err := parseMessageParams(msg, &name); err != nil {
372 return err
373 }
374
375 var modeStr string
376 if len(msg.Params) > 1 {
377 modeStr = msg.Params[1]
378 }
379
380 if msg.Prefix.Name != name {
381 ch, err := dc.user.getChannel(name)
382 if err != nil {
383 return err
384 }
385
386 if modeStr != "" {
387 ch.conn.SendMessage(msg)
388 } else {
389 dc.SendMessage(&irc.Message{
390 Prefix: dc.srv.prefix(),
391 Command: irc.RPL_CHANNELMODEIS,
392 Params: []string{ch.Name, string(ch.modes)},
393 })
394 }
395 } else {
396 if name != dc.nick {
397 return ircError{&irc.Message{
398 Command: irc.ERR_USERSDONTMATCH,
399 Params: []string{dc.nick, "Cannot change mode for other users"},
400 }}
401 }
402
403 if modeStr != "" {
404 dc.user.forEachUpstream(func(uc *upstreamConn) {
405 uc.SendMessage(msg)
406 })
407 } else {
408 dc.SendMessage(&irc.Message{
409 Prefix: dc.srv.prefix(),
410 Command: irc.RPL_UMODEIS,
411 Params: []string{""}, // TODO
412 })
413 }
414 }
415 case "PRIVMSG":
416 var targetsStr, text string
417 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
418 return err
419 }
420
421 for _, name := range strings.Split(targetsStr, ",") {
422 ch, err := dc.user.getChannel(name)
423 if err != nil {
424 return err
425 }
426
427 ch.conn.SendMessage(&irc.Message{
428 Prefix: msg.Prefix,
429 Command: "PRIVMSG",
430 Params: []string{name, text},
431 })
432 }
433 default:
434 dc.logger.Printf("unhandled message: %v", msg)
435 return newUnknownCommandError(msg.Command)
436 }
437 return nil
438}
Note: See TracBrowser for help on using the repository browser.