source: code/trunk/downstream.go@ 61

Last change on this file since 61 was 60, checked in by contact, 5 years ago

Add upstreamConn.SendMessage

Allows us to change upstreamConn implementation details without updating
the whole codebase.

File size: 8.7 KB
Line 
1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
7 "strings"
8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
50
51 registered bool
52 user *user
53 nick string
54 username string
55 realname string
56}
57
58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
59 dc := &downstreamConn{
60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
67 }
68
69 go func() {
70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
72 }
73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
75 } else {
76 dc.logger.Printf("connection closed")
77 }
78 }()
79
80 return dc
81}
82
83func (dc *downstreamConn) prefix() *irc.Prefix {
84 return &irc.Prefix{
85 Name: dc.nick,
86 User: dc.username,
87 // TODO: fill the host?
88 }
89}
90
91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
102
103 for {
104 msg, err := dc.irc.ReadMessage()
105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
111 err = dc.handleMessage(msg)
112 if ircErr, ok := err.(ircError); ok {
113 ircErr.Message.Prefix = dc.srv.prefix()
114 dc.SendMessage(ircErr.Message)
115 } else if err != nil {
116 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
117 }
118
119 if dc.isClosed() {
120 return nil
121 }
122 }
123
124 return nil
125}
126
127func (dc *downstreamConn) writeMessages() error {
128 for {
129 var err error
130 var closed bool
131 select {
132 case msg := <-dc.messages:
133 err = dc.irc.WriteMessage(msg)
134 case consumer := <-dc.consumers:
135 for {
136 msg := consumer.Peek()
137 if msg == nil {
138 break
139 }
140 err = dc.irc.WriteMessage(msg)
141 if err != nil {
142 break
143 }
144 consumer.Consume()
145 }
146 case <-dc.closed:
147 closed = true
148 }
149 if err != nil {
150 return err
151 }
152 if closed {
153 break
154 }
155 }
156 return nil
157}
158
159func (dc *downstreamConn) Close() error {
160 if dc.isClosed() {
161 return fmt.Errorf("downstream connection already closed")
162 }
163
164 if u := dc.user; u != nil {
165 u.lock.Lock()
166 for i := range u.downstreamConns {
167 if u.downstreamConns[i] == dc {
168 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
169 }
170 }
171 u.lock.Unlock()
172 }
173
174 close(dc.closed)
175 return nil
176}
177
178func (dc *downstreamConn) SendMessage(msg *irc.Message) {
179 dc.messages <- msg
180}
181
182func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
183 switch msg.Command {
184 case "QUIT":
185 return dc.Close()
186 case "PING":
187 // TODO: handle params
188 dc.SendMessage(&irc.Message{
189 Prefix: dc.srv.prefix(),
190 Command: "PONG",
191 Params: []string{dc.srv.Hostname},
192 })
193 return nil
194 default:
195 if dc.registered {
196 return dc.handleMessageRegistered(msg)
197 } else {
198 return dc.handleMessageUnregistered(msg)
199 }
200 }
201}
202
203func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
204 switch msg.Command {
205 case "NICK":
206 if err := parseMessageParams(msg, &dc.nick); err != nil {
207 return err
208 }
209 case "USER":
210 var username string
211 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
212 return err
213 }
214 dc.username = "~" + username
215 default:
216 dc.logger.Printf("unhandled message: %v", msg)
217 return newUnknownCommandError(msg.Command)
218 }
219 if dc.username != "" && dc.nick != "" {
220 return dc.register()
221 }
222 return nil
223}
224
225func (dc *downstreamConn) register() error {
226 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
227 if u == nil {
228 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
229 dc.SendMessage(&irc.Message{
230 Prefix: dc.srv.prefix(),
231 Command: irc.ERR_PASSWDMISMATCH,
232 Params: []string{"*", "Invalid username or password"},
233 })
234 return nil
235 }
236
237 dc.registered = true
238 dc.user = u
239
240 u.lock.Lock()
241 firstDownstream := len(u.downstreamConns) == 0
242 u.downstreamConns = append(u.downstreamConns, dc)
243 u.lock.Unlock()
244
245 dc.SendMessage(&irc.Message{
246 Prefix: dc.srv.prefix(),
247 Command: irc.RPL_WELCOME,
248 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
249 })
250 dc.SendMessage(&irc.Message{
251 Prefix: dc.srv.prefix(),
252 Command: irc.RPL_YOURHOST,
253 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
254 })
255 dc.SendMessage(&irc.Message{
256 Prefix: dc.srv.prefix(),
257 Command: irc.RPL_CREATED,
258 Params: []string{dc.nick, "Who cares when the server was created?"},
259 })
260 dc.SendMessage(&irc.Message{
261 Prefix: dc.srv.prefix(),
262 Command: irc.RPL_MYINFO,
263 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
264 })
265 dc.SendMessage(&irc.Message{
266 Prefix: dc.srv.prefix(),
267 Command: irc.ERR_NOMOTD,
268 Params: []string{dc.nick, "No MOTD"},
269 })
270
271 u.forEachUpstream(func(uc *upstreamConn) {
272 // TODO: fix races accessing upstream connection data
273 for _, ch := range uc.channels {
274 if ch.complete {
275 forwardChannel(dc, ch)
276 }
277 }
278
279 // TODO: let clients specify the ring buffer name in their username
280 historyName := ""
281
282 var seqPtr *uint64
283 if firstDownstream {
284 seq, ok := uc.history[historyName]
285 if ok {
286 seqPtr = &seq
287 }
288 }
289
290 consumer, ch := uc.ring.NewConsumer(seqPtr)
291 go func() {
292 for {
293 var closed bool
294 select {
295 case <-ch:
296 dc.consumers <- consumer
297 case <-dc.closed:
298 closed = true
299 }
300 if closed {
301 break
302 }
303 }
304
305 seq := consumer.Close()
306
307 dc.user.lock.Lock()
308 lastDownstream := len(dc.user.downstreamConns) == 0
309 dc.user.lock.Unlock()
310
311 if lastDownstream {
312 uc.history[historyName] = seq
313 }
314 }()
315 })
316
317 return nil
318}
319
320func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
321 switch msg.Command {
322 case "USER":
323 return ircError{&irc.Message{
324 Command: irc.ERR_ALREADYREGISTERED,
325 Params: []string{dc.nick, "You may not reregister"},
326 }}
327 case "NICK":
328 dc.user.forEachUpstream(func(uc *upstreamConn) {
329 uc.SendMessage(msg)
330 })
331 case "JOIN":
332 var name string
333 if err := parseMessageParams(msg, &name); err != nil {
334 return err
335 }
336
337 if ch, _ := dc.user.getChannel(name); ch != nil {
338 break // already joined
339 }
340
341 // TODO: extract network name from channel name
342 return ircError{&irc.Message{
343 Command: irc.ERR_NOSUCHCHANNEL,
344 Params: []string{name, "Channel name ambiguous"},
345 }}
346 case "PART":
347 var name string
348 if err := parseMessageParams(msg, &name); err != nil {
349 return err
350 }
351
352 ch, err := dc.user.getChannel(name)
353 if err != nil {
354 return err
355 }
356
357 ch.conn.SendMessage(msg)
358 // TODO: remove channel from upstream config
359 case "MODE":
360 var name string
361 if err := parseMessageParams(msg, &name); err != nil {
362 return err
363 }
364
365 var modeStr string
366 if len(msg.Params) > 1 {
367 modeStr = msg.Params[1]
368 }
369
370 if msg.Prefix.Name != name {
371 ch, err := dc.user.getChannel(name)
372 if err != nil {
373 return err
374 }
375
376 if modeStr != "" {
377 ch.conn.SendMessage(msg)
378 } else {
379 dc.SendMessage(&irc.Message{
380 Prefix: dc.srv.prefix(),
381 Command: irc.RPL_CHANNELMODEIS,
382 Params: []string{ch.Name, string(ch.modes)},
383 })
384 }
385 } else {
386 if name != dc.nick {
387 return ircError{&irc.Message{
388 Command: irc.ERR_USERSDONTMATCH,
389 Params: []string{dc.nick, "Cannot change mode for other users"},
390 }}
391 }
392
393 if modeStr != "" {
394 dc.user.forEachUpstream(func(uc *upstreamConn) {
395 uc.SendMessage(msg)
396 })
397 } else {
398 dc.SendMessage(&irc.Message{
399 Prefix: dc.srv.prefix(),
400 Command: irc.RPL_UMODEIS,
401 Params: []string{""}, // TODO
402 })
403 }
404 }
405 case "PRIVMSG":
406 var targetsStr, text string
407 if err := parseMessageParams(msg, &targetsStr, &text); err != nil {
408 return err
409 }
410
411 for _, name := range strings.Split(targetsStr, ",") {
412 ch, err := dc.user.getChannel(name)
413 if err != nil {
414 return err
415 }
416
417 ch.conn.SendMessage(&irc.Message{
418 Prefix: msg.Prefix,
419 Command: "PRIVMSG",
420 Params: []string{name, text},
421 })
422 }
423 default:
424 dc.logger.Printf("unhandled message: %v", msg)
425 return newUnknownCommandError(msg.Command)
426 }
427 return nil
428}
Note: See TracBrowser for help on using the repository browser.