source: code/trunk/downstream.go@ 64

Last change on this file since 64 was 64, checked in by contact, 5 years ago

Add a -debug flag

File size: 9.0 KB
Line 
1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
7 "strings"
8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
50
51 registered bool
52 user *user
53 nick string
54 username string
55 realname string
56}
57
58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
59 dc := &downstreamConn{
60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
67 }
68
69 go func() {
70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
72 }
73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
75 } else {
76 dc.logger.Printf("connection closed")
77 }
78 }()
79
80 return dc
81}
82
83func (dc *downstreamConn) prefix() *irc.Prefix {
84 return &irc.Prefix{
85 Name: dc.nick,
86 User: dc.username,
87 // TODO: fill the host?
88 }
89}
90
91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
102
103 for {
104 msg, err := dc.irc.ReadMessage()
105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
111 if dc.srv.Debug {
112 dc.logger.Printf("received: %v", msg)
113 }
114
115 err = dc.handleMessage(msg)
116 if ircErr, ok := err.(ircError); ok {
117 ircErr.Message.Prefix = dc.srv.prefix()
118 dc.SendMessage(ircErr.Message)
119 } else if err != nil {
120 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
121 }
122
123 if dc.isClosed() {
124 return nil
125 }
126 }
127
128 return nil
129}
130
131func (dc *downstreamConn) writeMessages() error {
132 for {
133 var err error
134 var closed bool
135 select {
136 case msg := <-dc.messages:
137 if dc.srv.Debug {
138 dc.logger.Printf("sent: %v", msg)
139 }
140 err = dc.irc.WriteMessage(msg)
141 case consumer := <-dc.consumers:
142 for {
143 msg := consumer.Peek()
144 if msg == nil {
145 break
146 }
147 if dc.srv.Debug {
148 dc.logger.Printf("sent: %v", msg)
149 }
150 err = dc.irc.WriteMessage(msg)
151 if err != nil {
152 break
153 }
154 consumer.Consume()
155 }
156 case <-dc.closed:
157 closed = true
158 }
159 if err != nil {
160 return err
161 }
162 if closed {
163 break
164 }
165 }
166 return nil
167}
168
169func (dc *downstreamConn) Close() error {
170 if dc.isClosed() {
171 return fmt.Errorf("downstream connection already closed")
172 }
173
174 if u := dc.user; u != nil {
175 u.lock.Lock()
176 for i := range u.downstreamConns {
177 if u.downstreamConns[i] == dc {
178 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
179 break
180 }
181 }
182 u.lock.Unlock()
183 }
184
185 close(dc.closed)
186 return nil
187}
188
189func (dc *downstreamConn) SendMessage(msg *irc.Message) {
190 dc.messages <- msg
191}
192
193func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
194 switch msg.Command {
195 case "QUIT":
196 return dc.Close()
197 case "PING":
198 // TODO: handle params
199 dc.SendMessage(&irc.Message{
200 Prefix: dc.srv.prefix(),
201 Command: "PONG",
202 Params: []string{dc.srv.Hostname},
203 })
204 return nil
205 default:
206 if dc.registered {
207 return dc.handleMessageRegistered(msg)
208 } else {
209 return dc.handleMessageUnregistered(msg)
210 }
211 }
212}
213
214func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
215 switch msg.Command {
216 case "NICK":
217 if err := parseMessageParams(msg, &dc.nick); err != nil {
218 return err
219 }
220 case "USER":
221 var username string
222 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
223 return err
224 }
225 dc.username = "~" + username
226 default:
227 dc.logger.Printf("unhandled message: %v", msg)
228 return newUnknownCommandError(msg.Command)
229 }
230 if dc.username != "" && dc.nick != "" {
231 return dc.register()
232 }
233 return nil
234}
235
236func (dc *downstreamConn) register() error {
237 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
238 if u == nil {
239 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
240 dc.SendMessage(&irc.Message{
241 Prefix: dc.srv.prefix(),
242 Command: irc.ERR_PASSWDMISMATCH,
243 Params: []string{"*", "Invalid username or password"},
244 })
245 return nil
246 }
247
248 dc.registered = true
249 dc.user = u
250
251 u.lock.Lock()
252 firstDownstream := len(u.downstreamConns) == 0
253 u.downstreamConns = append(u.downstreamConns, dc)
254 u.lock.Unlock()
255
256 dc.SendMessage(&irc.Message{
257 Prefix: dc.srv.prefix(),
258 Command: irc.RPL_WELCOME,
259 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
260 })
261 dc.SendMessage(&irc.Message{
262 Prefix: dc.srv.prefix(),
263 Command: irc.RPL_YOURHOST,
264 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
265 })
266 dc.SendMessage(&irc.Message{
267 Prefix: dc.srv.prefix(),
268 Command: irc.RPL_CREATED,
269 Params: []string{dc.nick, "Who cares when the server was created?"},
270 })
271 dc.SendMessage(&irc.Message{
272 Prefix: dc.srv.prefix(),
273 Command: irc.RPL_MYINFO,
274 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
275 })
276 dc.SendMessage(&irc.Message{
277 Prefix: dc.srv.prefix(),
278 Command: irc.ERR_NOMOTD,
279 Params: []string{dc.nick, "No MOTD"},
280 })
281
282 u.forEachUpstream(func(uc *upstreamConn) {
283 // TODO: fix races accessing upstream connection data
284 for _, ch := range uc.channels {
285 if ch.complete {
286 forwardChannel(dc, ch)
287 }
288 }
289
290 // TODO: let clients specify the ring buffer name in their username
291 historyName := ""
292
293 var seqPtr *uint64
294 if firstDownstream {
295 seq, ok := uc.history[historyName]
296 if ok {
297 seqPtr = &seq
298 }
299 }
300
301 consumer, ch := uc.ring.NewConsumer(seqPtr)
302 go func() {
303 for {
304 var closed bool
305 select {
306 case <-ch:
307 dc.consumers <- consumer
308 case <-dc.closed:
309 closed = true
310 }
311 if closed {
312 break
313 }
314 }
315
316 seq := consumer.Close()
317
318 dc.user.lock.Lock()
319 lastDownstream := len(dc.user.downstreamConns) == 0
320 dc.user.lock.Unlock()
321
322 if lastDownstream {
323 uc.history[historyName] = seq
324 }
325 }()
326 })
327
328 return nil
329}
330
331func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
332 switch msg.Command {
333 case "USER":
334 return ircError{&irc.Message{
335 Command: irc.ERR_ALREADYREGISTERED,
336 Params: []string{dc.nick, "You may not reregister"},
337 }}
338 case "NICK":
339 dc.user.forEachUpstream(func(uc *upstreamConn) {
340 uc.SendMessage(msg)
341 })
342 case "JOIN":
343 var name string
344 if err := parseMessageParams(msg, &name); err != nil {
345 return err
346 }
347
348 if ch, _ := dc.user.getChannel(name); ch != nil {
349 break // already joined
350 }
351
352 // TODO: extract network name from channel name
353 return ircError{&irc.Message{
354 Command: irc.ERR_NOSUCHCHANNEL,
355 Params: []string{name, "Channel name ambiguous"},
356 }}
357 case "PART":
358 var name string
359 if err := parseMessageParams(msg, &name); err != nil {
360 return err
361 }
362
363 ch, err := dc.user.getChannel(name)
364 if err != nil {
365 return err
366 }
367
368 ch.conn.SendMessage(msg)
369 // TODO: remove channel from upstream config
370 case "MODE":
371 var name string
372 if err := parseMessageParams(msg, &name); err != nil {
373 return err
374 }
375
376 var modeStr string
377 if len(msg.Params) > 1 {
378 modeStr = msg.Params[1]
379 }
380
381 if msg.Prefix.Name != name {
382 ch, err := dc.user.getChannel(name)
383 if err != nil {
384 return err
385 }
386
387 if modeStr != "" {
388 ch.conn.SendMessage(msg)
389 } else {
390 dc.SendMessage(&irc.Message{
391 Prefix: dc.srv.prefix(),
392 Command: irc.RPL_CHANNELMODEIS,
393 Params: []string{ch.Name, string(ch.modes)},
394 })
395 }
396 } else {
397 if name != dc.nick {
398 return ircError{&irc.Message{
399 Command: irc.ERR_USERSDONTMATCH,
400 Params: []string{dc.nick, "Cannot change mode for other users"},
401 }}
402 }
403
404 if modeStr != "" {
405 dc.user.forEachUpstream(func(uc *upstreamConn) {
406 uc.SendMessage(msg)
407 })
408 } else {
409 dc.SendMessage(&irc.Message{
410 Prefix: dc.srv.prefix(),
411 Command: irc.RPL_UMODEIS,
412 Params: []string{""}, // TODO
413 })
414 }
415 }
416 case "PRIVMSG":
417 var targetsStr, text string
418 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
419 return err
420 }
421
422 for _, name := range strings.Split(targetsStr, ",") {
423 ch, err := dc.user.getChannel(name)
424 if err != nil {
425 return err
426 }
427
428 ch.conn.SendMessage(&irc.Message{
429 Prefix: msg.Prefix,
430 Command: "PRIVMSG",
431 Params: []string{name, text},
432 })
433 }
434 default:
435 dc.logger.Printf("unhandled message: %v", msg)
436 return newUnknownCommandError(msg.Command)
437 }
438 return nil
439}
Note: See TracBrowser for help on using the repository browser.