source: code/trunk/downstream.go@ 57

Last change on this file since 57 was 57, checked in by contact, 5 years ago

Fix issues related to Ring

  • RingConsumer is now used directly in the goroutine responsible for writing downstream messages. This allows the ring buffer not to be consumed on write error.
  • RingConsumer now has a channel attached. This allows PRIVMSG messages to always use RingConsumer, instead of also directly pushing messages to all downstream connections.
  • Multiple clients with the same history name are now supported.
  • Ring is now protected by a mutex
File size: 8.3 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
[57]43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
47 messages chan *irc.Message
48 consumers chan *RingConsumer
49 closed chan struct{}
[22]50
[13]51 registered bool
[37]52 user *user
[13]53 nick string
54 username string
55 realname string
56}
57
[22]58func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]59 dc := &downstreamConn{
[57]60 net: netConn,
61 irc: irc.NewConn(netConn),
62 srv: srv,
63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
64 messages: make(chan *irc.Message, 64),
65 consumers: make(chan *RingConsumer),
66 closed: make(chan struct{}),
[22]67 }
[26]68
69 go func() {
[56]70 if err := dc.writeMessages(); err != nil {
71 dc.logger.Printf("failed to write message: %v", err)
[26]72 }
[55]73 if err := dc.net.Close(); err != nil {
74 dc.logger.Printf("failed to close connection: %v", err)
[45]75 } else {
[55]76 dc.logger.Printf("connection closed")
[45]77 }
[26]78 }()
79
[55]80 return dc
[22]81}
82
[55]83func (dc *downstreamConn) prefix() *irc.Prefix {
[27]84 return &irc.Prefix{
[55]85 Name: dc.nick,
86 User: dc.username,
[27]87 // TODO: fill the host?
88 }
89}
90
[57]91func (dc *downstreamConn) isClosed() bool {
92 select {
93 case <-dc.closed:
94 return true
95 default:
96 return false
97 }
98}
99
[55]100func (dc *downstreamConn) readMessages() error {
101 dc.logger.Printf("new connection")
[22]102
103 for {
[55]104 msg, err := dc.irc.ReadMessage()
[22]105 if err == io.EOF {
106 break
107 } else if err != nil {
108 return fmt.Errorf("failed to read IRC command: %v", err)
109 }
110
[55]111 err = dc.handleMessage(msg)
[22]112 if ircErr, ok := err.(ircError); ok {
[55]113 ircErr.Message.Prefix = dc.srv.prefix()
114 dc.SendMessage(ircErr.Message)
[22]115 } else if err != nil {
116 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
117 }
118
[57]119 if dc.isClosed() {
[22]120 return nil
121 }
122 }
123
[45]124 return nil
[22]125}
126
[56]127func (dc *downstreamConn) writeMessages() error {
[57]128 for {
129 var err error
130 var closed bool
131 select {
132 case msg := <-dc.messages:
133 err = dc.irc.WriteMessage(msg)
134 case consumer := <-dc.consumers:
135 for {
136 msg := consumer.Peek()
137 if msg == nil {
138 break
139 }
140 err = dc.irc.WriteMessage(msg)
141 if err != nil {
142 break
143 }
144 consumer.Consume()
145 }
146 case <-dc.closed:
147 closed = true
148 }
149 if err != nil {
[56]150 return err
151 }
[57]152 if closed {
153 break
154 }
[56]155 }
156 return nil
157}
158
[55]159func (dc *downstreamConn) Close() error {
[57]160 if dc.isClosed() {
[26]161 return fmt.Errorf("downstream connection already closed")
162 }
[40]163
[55]164 if u := dc.user; u != nil {
[40]165 u.lock.Lock()
166 for i := range u.downstreamConns {
[55]167 if u.downstreamConns[i] == dc {
[40]168 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
169 }
170 }
171 u.lock.Unlock()
[13]172 }
[40]173
[57]174 close(dc.closed)
[45]175 return nil
[13]176}
177
[55]178func (dc *downstreamConn) SendMessage(msg *irc.Message) {
179 dc.messages <- msg
[54]180}
181
[55]182func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]183 switch msg.Command {
[28]184 case "QUIT":
[55]185 return dc.Close()
[13]186 case "PING":
187 // TODO: handle params
[55]188 dc.SendMessage(&irc.Message{
189 Prefix: dc.srv.prefix(),
[13]190 Command: "PONG",
[55]191 Params: []string{dc.srv.Hostname},
[54]192 })
[26]193 return nil
[13]194 default:
[55]195 if dc.registered {
196 return dc.handleMessageRegistered(msg)
[13]197 } else {
[55]198 return dc.handleMessageUnregistered(msg)
[13]199 }
200 }
201}
202
[55]203func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]204 switch msg.Command {
205 case "NICK":
[55]206 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]207 return err
[13]208 }
209 case "USER":
[43]210 var username string
[55]211 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]212 return err
[13]213 }
[55]214 dc.username = "~" + username
[13]215 default:
[55]216 dc.logger.Printf("unhandled message: %v", msg)
[13]217 return newUnknownCommandError(msg.Command)
218 }
[55]219 if dc.username != "" && dc.nick != "" {
220 return dc.register()
[13]221 }
222 return nil
223}
224
[55]225func (dc *downstreamConn) register() error {
226 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
[38]227 if u == nil {
[55]228 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
229 dc.SendMessage(&irc.Message{
230 Prefix: dc.srv.prefix(),
[37]231 Command: irc.ERR_PASSWDMISMATCH,
232 Params: []string{"*", "Invalid username or password"},
[54]233 })
[37]234 return nil
235 }
236
[55]237 dc.registered = true
238 dc.user = u
[13]239
[40]240 u.lock.Lock()
[57]241 firstDownstream := len(u.downstreamConns) == 0
[55]242 u.downstreamConns = append(u.downstreamConns, dc)
[40]243 u.lock.Unlock()
244
[55]245 dc.SendMessage(&irc.Message{
246 Prefix: dc.srv.prefix(),
[13]247 Command: irc.RPL_WELCOME,
[55]248 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]249 })
[55]250 dc.SendMessage(&irc.Message{
251 Prefix: dc.srv.prefix(),
[13]252 Command: irc.RPL_YOURHOST,
[55]253 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]254 })
[55]255 dc.SendMessage(&irc.Message{
256 Prefix: dc.srv.prefix(),
[13]257 Command: irc.RPL_CREATED,
[55]258 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]259 })
[55]260 dc.SendMessage(&irc.Message{
261 Prefix: dc.srv.prefix(),
[13]262 Command: irc.RPL_MYINFO,
[55]263 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]264 })
[55]265 dc.SendMessage(&irc.Message{
266 Prefix: dc.srv.prefix(),
[13]267 Command: irc.ERR_NOMOTD,
[55]268 Params: []string{dc.nick, "No MOTD"},
[54]269 })
[13]270
[39]271 u.forEachUpstream(func(uc *upstreamConn) {
[30]272 // TODO: fix races accessing upstream connection data
273 for _, ch := range uc.channels {
274 if ch.complete {
[55]275 forwardChannel(dc, ch)
[30]276 }
277 }
[50]278
[51]279 // TODO: let clients specify the ring buffer name in their username
[57]280 historyName := ""
281
282 var seqPtr *uint64
283 if firstDownstream {
284 seq, ok := uc.history[historyName]
285 if ok {
286 seqPtr = &seq
[50]287 }
288 }
[57]289
290 consumer, ch := uc.ring.Consumer(seqPtr)
291 go func() {
292 for {
293 var closed bool
294 select {
295 case <-ch:
296 dc.consumers <- consumer
297 case <-dc.closed:
298 closed = true
299 }
300 if closed {
301 break
302 }
303 }
304
305 seq := consumer.Close()
306
307 dc.user.lock.Lock()
308 lastDownstream := len(dc.user.downstreamConns) == 0
309 dc.user.lock.Unlock()
310
311 if lastDownstream {
312 uc.history[historyName] = seq
313 }
314 }()
[39]315 })
[50]316
[13]317 return nil
318}
319
[55]320func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]321 switch msg.Command {
[42]322 case "USER":
[13]323 return ircError{&irc.Message{
324 Command: irc.ERR_ALREADYREGISTERED,
[55]325 Params: []string{dc.nick, "You may not reregister"},
[13]326 }}
[42]327 case "NICK":
[55]328 dc.user.forEachUpstream(func(uc *upstreamConn) {
[42]329 uc.messages <- msg
330 })
[48]331 case "JOIN":
332 var name string
333 if err := parseMessageParams(msg, &name); err != nil {
334 return err
335 }
336
[55]337 if ch, _ := dc.user.getChannel(name); ch != nil {
[48]338 break // already joined
339 }
340
341 // TODO: extract network name from channel name
342 return ircError{&irc.Message{
343 Command: irc.ERR_NOSUCHCHANNEL,
344 Params: []string{name, "Channel name ambiguous"},
345 }}
[49]346 case "PART":
347 var name string
348 if err := parseMessageParams(msg, &name); err != nil {
349 return err
350 }
351
[55]352 ch, err := dc.user.getChannel(name)
[49]353 if err != nil {
354 return err
355 }
356
357 ch.conn.messages <- msg
358 // TODO: remove channel from upstream config
[46]359 case "MODE":
360 var name string
361 if err := parseMessageParams(msg, &name); err != nil {
362 return err
363 }
364
365 var modeStr string
366 if len(msg.Params) > 1 {
367 modeStr = msg.Params[1]
368 }
369
370 if msg.Prefix.Name != name {
[55]371 ch, err := dc.user.getChannel(name)
[46]372 if err != nil {
373 return err
374 }
375
376 if modeStr != "" {
377 ch.conn.messages <- msg
378 } else {
[55]379 dc.SendMessage(&irc.Message{
380 Prefix: dc.srv.prefix(),
[46]381 Command: irc.RPL_CHANNELMODEIS,
382 Params: []string{ch.Name, string(ch.modes)},
[54]383 })
[46]384 }
385 } else {
[55]386 if name != dc.nick {
[46]387 return ircError{&irc.Message{
388 Command: irc.ERR_USERSDONTMATCH,
[55]389 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]390 }}
391 }
392
393 if modeStr != "" {
[55]394 dc.user.forEachUpstream(func(uc *upstreamConn) {
[46]395 uc.messages <- msg
396 })
397 } else {
[55]398 dc.SendMessage(&irc.Message{
399 Prefix: dc.srv.prefix(),
[46]400 Command: irc.RPL_UMODEIS,
401 Params: []string{""}, // TODO
[54]402 })
[46]403 }
404 }
[13]405 default:
[55]406 dc.logger.Printf("unhandled message: %v", msg)
[13]407 return newUnknownCommandError(msg.Command)
408 }
[42]409 return nil
[13]410}
Note: See TracBrowser for help on using the repository browser.