source: code/trunk/downstream.go@ 69

Last change on this file since 69 was 69, checked in by contact, 5 years ago

Add functions to translate between upstream and downstream names

File size: 10.3 KB
Line 
1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
7 "strings"
8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type consumption struct {
43 consumer *RingConsumer
44 upstreamConn *upstreamConn
45}
46
47type downstreamConn struct {
48 net net.Conn
49 irc *irc.Conn
50 srv *Server
51 logger Logger
52 messages chan *irc.Message
53 consumptions chan consumption
54 closed chan struct{}
55
56 registered bool
57 user *user
58 nick string
59 username string
60 realname string
61}
62
63func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
64 dc := &downstreamConn{
65 net: netConn,
66 irc: irc.NewConn(netConn),
67 srv: srv,
68 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
69 messages: make(chan *irc.Message, 64),
70 consumptions: make(chan consumption),
71 closed: make(chan struct{}),
72 }
73
74 go func() {
75 if err := dc.writeMessages(); err != nil {
76 dc.logger.Printf("failed to write message: %v", err)
77 }
78 if err := dc.net.Close(); err != nil {
79 dc.logger.Printf("failed to close connection: %v", err)
80 } else {
81 dc.logger.Printf("connection closed")
82 }
83 }()
84
85 return dc
86}
87
88func (dc *downstreamConn) prefix() *irc.Prefix {
89 return &irc.Prefix{
90 Name: dc.nick,
91 User: dc.username,
92 // TODO: fill the host?
93 }
94}
95
96func (dc *downstreamConn) marshalChannel(uc *upstreamConn, name string) string {
97 return name
98}
99
100func (dc *downstreamConn) unmarshalChannel(name string) (*upstreamConn, string, error) {
101 // TODO: extract network name from channel name
102 ch, err := dc.user.getChannel(name)
103 if err != nil {
104 return nil, "", err
105 }
106 return ch.conn, ch.Name, nil
107}
108
109func (dc *downstreamConn) marshalNick(uc *upstreamConn, nick string) string {
110 if nick == uc.nick {
111 return dc.nick
112 }
113 return nick
114}
115
116func (dc *downstreamConn) marshalUserPrefix(uc *upstreamConn, prefix *irc.Prefix) *irc.Prefix {
117 if prefix.Name == uc.nick {
118 return dc.prefix()
119 }
120 return prefix
121}
122
123func (dc *downstreamConn) isClosed() bool {
124 select {
125 case <-dc.closed:
126 return true
127 default:
128 return false
129 }
130}
131
132func (dc *downstreamConn) readMessages() error {
133 dc.logger.Printf("new connection")
134
135 for {
136 msg, err := dc.irc.ReadMessage()
137 if err == io.EOF {
138 break
139 } else if err != nil {
140 return fmt.Errorf("failed to read IRC command: %v", err)
141 }
142
143 if dc.srv.Debug {
144 dc.logger.Printf("received: %v", msg)
145 }
146
147 err = dc.handleMessage(msg)
148 if ircErr, ok := err.(ircError); ok {
149 ircErr.Message.Prefix = dc.srv.prefix()
150 dc.SendMessage(ircErr.Message)
151 } else if err != nil {
152 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
153 }
154
155 if dc.isClosed() {
156 return nil
157 }
158 }
159
160 return nil
161}
162
163func (dc *downstreamConn) writeMessages() error {
164 for {
165 var err error
166 var closed bool
167 select {
168 case msg := <-dc.messages:
169 if dc.srv.Debug {
170 dc.logger.Printf("sent: %v", msg)
171 }
172 err = dc.irc.WriteMessage(msg)
173 case consumption := <-dc.consumptions:
174 consumer, uc := consumption.consumer, consumption.upstreamConn
175 for {
176 msg := consumer.Peek()
177 if msg == nil {
178 break
179 }
180 msg = msg.Copy()
181 switch msg.Command {
182 case "PRIVMSG":
183 // TODO: detect whether it's a user or a channel
184 msg.Params[0] = dc.marshalChannel(uc, msg.Params[0])
185 default:
186 panic("expected to consume a PRIVMSG message")
187 }
188 if dc.srv.Debug {
189 dc.logger.Printf("sent: %v", msg)
190 }
191 err = dc.irc.WriteMessage(msg)
192 if err != nil {
193 break
194 }
195 consumer.Consume()
196 }
197 case <-dc.closed:
198 closed = true
199 }
200 if err != nil {
201 return err
202 }
203 if closed {
204 break
205 }
206 }
207 return nil
208}
209
210func (dc *downstreamConn) Close() error {
211 if dc.isClosed() {
212 return fmt.Errorf("downstream connection already closed")
213 }
214
215 if u := dc.user; u != nil {
216 u.lock.Lock()
217 for i := range u.downstreamConns {
218 if u.downstreamConns[i] == dc {
219 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
220 break
221 }
222 }
223 u.lock.Unlock()
224 }
225
226 close(dc.closed)
227 return nil
228}
229
230func (dc *downstreamConn) SendMessage(msg *irc.Message) {
231 dc.messages <- msg
232}
233
234func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
235 switch msg.Command {
236 case "QUIT":
237 return dc.Close()
238 case "PING":
239 dc.SendMessage(&irc.Message{
240 Prefix: dc.srv.prefix(),
241 Command: "PONG",
242 Params: msg.Params,
243 })
244 return nil
245 default:
246 if dc.registered {
247 return dc.handleMessageRegistered(msg)
248 } else {
249 return dc.handleMessageUnregistered(msg)
250 }
251 }
252}
253
254func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
255 switch msg.Command {
256 case "NICK":
257 if err := parseMessageParams(msg, &dc.nick); err != nil {
258 return err
259 }
260 case "USER":
261 var username string
262 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
263 return err
264 }
265 dc.username = "~" + username
266 default:
267 dc.logger.Printf("unhandled message: %v", msg)
268 return newUnknownCommandError(msg.Command)
269 }
270 if dc.username != "" && dc.nick != "" {
271 return dc.register()
272 }
273 return nil
274}
275
276func (dc *downstreamConn) register() error {
277 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
278 if u == nil {
279 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
280 dc.SendMessage(&irc.Message{
281 Prefix: dc.srv.prefix(),
282 Command: irc.ERR_PASSWDMISMATCH,
283 Params: []string{"*", "Invalid username or password"},
284 })
285 return nil
286 }
287
288 dc.registered = true
289 dc.user = u
290
291 u.lock.Lock()
292 firstDownstream := len(u.downstreamConns) == 0
293 u.downstreamConns = append(u.downstreamConns, dc)
294 u.lock.Unlock()
295
296 dc.SendMessage(&irc.Message{
297 Prefix: dc.srv.prefix(),
298 Command: irc.RPL_WELCOME,
299 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
300 })
301 dc.SendMessage(&irc.Message{
302 Prefix: dc.srv.prefix(),
303 Command: irc.RPL_YOURHOST,
304 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
305 })
306 dc.SendMessage(&irc.Message{
307 Prefix: dc.srv.prefix(),
308 Command: irc.RPL_CREATED,
309 Params: []string{dc.nick, "Who cares when the server was created?"},
310 })
311 dc.SendMessage(&irc.Message{
312 Prefix: dc.srv.prefix(),
313 Command: irc.RPL_MYINFO,
314 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
315 })
316 dc.SendMessage(&irc.Message{
317 Prefix: dc.srv.prefix(),
318 Command: irc.ERR_NOMOTD,
319 Params: []string{dc.nick, "No MOTD"},
320 })
321
322 u.forEachUpstream(func(uc *upstreamConn) {
323 // TODO: fix races accessing upstream connection data
324 for _, ch := range uc.channels {
325 if ch.complete {
326 forwardChannel(dc, ch)
327 }
328 }
329
330 // TODO: let clients specify the ring buffer name in their username
331 historyName := ""
332
333 var seqPtr *uint64
334 if firstDownstream {
335 seq, ok := uc.history[historyName]
336 if ok {
337 seqPtr = &seq
338 }
339 }
340
341 consumer, ch := uc.ring.NewConsumer(seqPtr)
342 go func() {
343 for {
344 var closed bool
345 select {
346 case <-ch:
347 dc.consumptions <- consumption{consumer, uc}
348 case <-dc.closed:
349 closed = true
350 }
351 if closed {
352 break
353 }
354 }
355
356 seq := consumer.Close()
357
358 dc.user.lock.Lock()
359 lastDownstream := len(dc.user.downstreamConns) == 0
360 dc.user.lock.Unlock()
361
362 if lastDownstream {
363 uc.history[historyName] = seq
364 }
365 }()
366 })
367
368 return nil
369}
370
371func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
372 switch msg.Command {
373 case "USER":
374 return ircError{&irc.Message{
375 Command: irc.ERR_ALREADYREGISTERED,
376 Params: []string{dc.nick, "You may not reregister"},
377 }}
378 case "NICK":
379 dc.user.forEachUpstream(func(uc *upstreamConn) {
380 uc.SendMessage(msg)
381 })
382 case "JOIN", "PART":
383 var name string
384 if err := parseMessageParams(msg, &name); err != nil {
385 return err
386 }
387
388 uc, upstreamName, err := dc.unmarshalChannel(name)
389 if err != nil {
390 return ircError{&irc.Message{
391 Command: irc.ERR_NOSUCHCHANNEL,
392 Params: []string{name, err.Error()},
393 }}
394 }
395
396 uc.SendMessage(&irc.Message{
397 Command: msg.Command,
398 Params: []string{upstreamName},
399 })
400 // TODO: add/remove channel from upstream config
401 case "MODE":
402 if msg.Prefix == nil {
403 return fmt.Errorf("missing prefix")
404 }
405
406 var name string
407 if err := parseMessageParams(msg, &name); err != nil {
408 return err
409 }
410
411 var modeStr string
412 if len(msg.Params) > 1 {
413 modeStr = msg.Params[1]
414 }
415
416 if msg.Prefix.Name != name {
417 uc, upstreamName, err := dc.unmarshalChannel(name)
418 if err != nil {
419 return err
420 }
421
422 if modeStr != "" {
423 uc.SendMessage(&irc.Message{
424 Prefix: uc.prefix(),
425 Command: "MODE",
426 Params: []string{upstreamName, modeStr},
427 })
428 } else {
429 ch, ok := uc.channels[upstreamName]
430 if !ok {
431 return ircError{&irc.Message{
432 Command: irc.ERR_NOSUCHCHANNEL,
433 Params: []string{name, "No such channel"},
434 }}
435 }
436
437 dc.SendMessage(&irc.Message{
438 Prefix: dc.srv.prefix(),
439 Command: irc.RPL_CHANNELMODEIS,
440 Params: []string{name, string(ch.modes)},
441 })
442 }
443 } else {
444 if name != dc.nick {
445 return ircError{&irc.Message{
446 Command: irc.ERR_USERSDONTMATCH,
447 Params: []string{dc.nick, "Cannot change mode for other users"},
448 }}
449 }
450
451 if modeStr != "" {
452 dc.user.forEachUpstream(func(uc *upstreamConn) {
453 uc.SendMessage(&irc.Message{
454 Prefix: uc.prefix(),
455 Command: "MODE",
456 Params: []string{uc.nick, modeStr},
457 })
458 })
459 } else {
460 dc.SendMessage(&irc.Message{
461 Prefix: dc.srv.prefix(),
462 Command: irc.RPL_UMODEIS,
463 Params: []string{""}, // TODO
464 })
465 }
466 }
467 case "PRIVMSG":
468 var targetsStr, text string
469 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
470 return err
471 }
472
473 for _, name := range strings.Split(targetsStr, ",") {
474 uc, upstreamName, err := dc.unmarshalChannel(name)
475 if err != nil {
476 return err
477 }
478
479 uc.SendMessage(&irc.Message{
480 Prefix: uc.prefix(),
481 Command: "PRIVMSG",
482 Params: []string{upstreamName, text},
483 })
484 }
485 default:
486 dc.logger.Printf("unhandled message: %v", msg)
487 return newUnknownCommandError(msg.Command)
488 }
489 return nil
490}
Note: See TracBrowser for help on using the repository browser.