source: code/trunk/downstream.go@ 56

Last change on this file since 56 was 56, checked in by contact, 5 years ago

Add downstreamConn.writeMessages

This logic will become more complicated in upcoming commits.

File size: 7.6 KB
RevLine 
[13]1package jounce
2
3import (
4 "fmt"
5 "io"
6 "net"
[39]7 "strings"
[13]8
9 "gopkg.in/irc.v3"
10)
11
12type ircError struct {
13 Message *irc.Message
14}
15
16func newUnknownCommandError(cmd string) ircError {
17 return ircError{&irc.Message{
18 Command: irc.ERR_UNKNOWNCOMMAND,
19 Params: []string{
20 "*",
21 cmd,
22 "Unknown command",
23 },
24 }}
25}
26
27func newNeedMoreParamsError(cmd string) ircError {
28 return ircError{&irc.Message{
29 Command: irc.ERR_NEEDMOREPARAMS,
30 Params: []string{
31 "*",
32 cmd,
33 "Not enough parameters",
34 },
35 }}
36}
37
38func (err ircError) Error() string {
39 return err.Message.String()
40}
41
42type downstreamConn struct {
[26]43 net net.Conn
44 irc *irc.Conn
45 srv *Server
46 logger Logger
[56]47 messages chan *irc.Message
[22]48
[13]49 registered bool
[37]50 user *user
[13]51 closed bool
52 nick string
53 username string
54 realname string
55}
56
[22]57func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn {
[55]58 dc := &downstreamConn{
[26]59 net: netConn,
60 irc: irc.NewConn(netConn),
61 srv: srv,
62 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())},
[56]63 messages: make(chan *irc.Message, 64),
[22]64 }
[26]65
66 go func() {
[56]67 if err := dc.writeMessages(); err != nil {
68 dc.logger.Printf("failed to write message: %v", err)
[26]69 }
[55]70 if err := dc.net.Close(); err != nil {
71 dc.logger.Printf("failed to close connection: %v", err)
[45]72 } else {
[55]73 dc.logger.Printf("connection closed")
[45]74 }
[26]75 }()
76
[55]77 return dc
[22]78}
79
[55]80func (dc *downstreamConn) prefix() *irc.Prefix {
[27]81 return &irc.Prefix{
[55]82 Name: dc.nick,
83 User: dc.username,
[27]84 // TODO: fill the host?
85 }
86}
87
[55]88func (dc *downstreamConn) readMessages() error {
89 dc.logger.Printf("new connection")
[22]90
91 for {
[55]92 msg, err := dc.irc.ReadMessage()
[22]93 if err == io.EOF {
94 break
95 } else if err != nil {
96 return fmt.Errorf("failed to read IRC command: %v", err)
97 }
98
[55]99 err = dc.handleMessage(msg)
[22]100 if ircErr, ok := err.(ircError); ok {
[55]101 ircErr.Message.Prefix = dc.srv.prefix()
102 dc.SendMessage(ircErr.Message)
[22]103 } else if err != nil {
104 return fmt.Errorf("failed to handle IRC command %q: %v", msg.Command, err)
105 }
106
[55]107 if dc.closed {
[22]108 return nil
109 }
110 }
111
[45]112 return nil
[22]113}
114
[56]115func (dc *downstreamConn) writeMessages() error {
116 for msg := range dc.messages {
117 if err := dc.irc.WriteMessage(msg); err != nil {
118 return err
119 }
120 }
121 return nil
122}
123
[55]124func (dc *downstreamConn) Close() error {
125 if dc.closed {
[26]126 return fmt.Errorf("downstream connection already closed")
127 }
[40]128
[55]129 if u := dc.user; u != nil {
[40]130 u.lock.Lock()
131 for i := range u.downstreamConns {
[55]132 if u.downstreamConns[i] == dc {
[40]133 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
134 }
135 }
136 u.lock.Unlock()
[51]137
138 // TODO: figure out a better way to advance the ring buffer consumer cursor
139 u.forEachUpstream(func(uc *upstreamConn) {
140 // TODO: let clients specify the ring buffer name in their username
141 uc.ring.Consumer("").Reset()
142 })
[13]143 }
[40]144
[55]145 close(dc.messages)
146 dc.closed = true
[40]147
[45]148 return nil
[13]149}
150
[55]151func (dc *downstreamConn) SendMessage(msg *irc.Message) {
152 dc.messages <- msg
[54]153}
154
[55]155func (dc *downstreamConn) handleMessage(msg *irc.Message) error {
[13]156 switch msg.Command {
[28]157 case "QUIT":
[55]158 return dc.Close()
[13]159 case "PING":
160 // TODO: handle params
[55]161 dc.SendMessage(&irc.Message{
162 Prefix: dc.srv.prefix(),
[13]163 Command: "PONG",
[55]164 Params: []string{dc.srv.Hostname},
[54]165 })
[26]166 return nil
[13]167 default:
[55]168 if dc.registered {
169 return dc.handleMessageRegistered(msg)
[13]170 } else {
[55]171 return dc.handleMessageUnregistered(msg)
[13]172 }
173 }
174}
175
[55]176func (dc *downstreamConn) handleMessageUnregistered(msg *irc.Message) error {
[13]177 switch msg.Command {
178 case "NICK":
[55]179 if err := parseMessageParams(msg, &dc.nick); err != nil {
[43]180 return err
[13]181 }
182 case "USER":
[43]183 var username string
[55]184 if err := parseMessageParams(msg, &username, nil, nil, &dc.realname); err != nil {
[43]185 return err
[13]186 }
[55]187 dc.username = "~" + username
[13]188 default:
[55]189 dc.logger.Printf("unhandled message: %v", msg)
[13]190 return newUnknownCommandError(msg.Command)
191 }
[55]192 if dc.username != "" && dc.nick != "" {
193 return dc.register()
[13]194 }
195 return nil
196}
197
[55]198func (dc *downstreamConn) register() error {
199 u := dc.srv.getUser(strings.TrimPrefix(dc.username, "~"))
[38]200 if u == nil {
[55]201 dc.logger.Printf("failed authentication: unknown username %q", dc.username)
202 dc.SendMessage(&irc.Message{
203 Prefix: dc.srv.prefix(),
[37]204 Command: irc.ERR_PASSWDMISMATCH,
205 Params: []string{"*", "Invalid username or password"},
[54]206 })
[37]207 return nil
208 }
209
[55]210 dc.registered = true
211 dc.user = u
[13]212
[40]213 u.lock.Lock()
[55]214 u.downstreamConns = append(u.downstreamConns, dc)
[40]215 u.lock.Unlock()
216
[55]217 dc.SendMessage(&irc.Message{
218 Prefix: dc.srv.prefix(),
[13]219 Command: irc.RPL_WELCOME,
[55]220 Params: []string{dc.nick, "Welcome to jounce, " + dc.nick},
[54]221 })
[55]222 dc.SendMessage(&irc.Message{
223 Prefix: dc.srv.prefix(),
[13]224 Command: irc.RPL_YOURHOST,
[55]225 Params: []string{dc.nick, "Your host is " + dc.srv.Hostname},
[54]226 })
[55]227 dc.SendMessage(&irc.Message{
228 Prefix: dc.srv.prefix(),
[13]229 Command: irc.RPL_CREATED,
[55]230 Params: []string{dc.nick, "Who cares when the server was created?"},
[54]231 })
[55]232 dc.SendMessage(&irc.Message{
233 Prefix: dc.srv.prefix(),
[13]234 Command: irc.RPL_MYINFO,
[55]235 Params: []string{dc.nick, dc.srv.Hostname, "jounce", "aiwroO", "OovaimnqpsrtklbeI"},
[54]236 })
[55]237 dc.SendMessage(&irc.Message{
238 Prefix: dc.srv.prefix(),
[13]239 Command: irc.ERR_NOMOTD,
[55]240 Params: []string{dc.nick, "No MOTD"},
[54]241 })
[13]242
[39]243 u.forEachUpstream(func(uc *upstreamConn) {
[30]244 // TODO: fix races accessing upstream connection data
245 for _, ch := range uc.channels {
246 if ch.complete {
[55]247 forwardChannel(dc, ch)
[30]248 }
249 }
[50]250
[51]251 // TODO: let clients specify the ring buffer name in their username
252 consumer := uc.ring.Consumer("")
[50]253 for {
[51]254 // TODO: these messages will get lost if the connection is closed
[50]255 msg := consumer.Consume()
256 if msg == nil {
257 break
258 }
[55]259 dc.SendMessage(msg)
[50]260 }
[39]261 })
[50]262
[13]263 return nil
264}
265
[55]266func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
[13]267 switch msg.Command {
[42]268 case "USER":
[13]269 return ircError{&irc.Message{
270 Command: irc.ERR_ALREADYREGISTERED,
[55]271 Params: []string{dc.nick, "You may not reregister"},
[13]272 }}
[42]273 case "NICK":
[55]274 dc.user.forEachUpstream(func(uc *upstreamConn) {
[42]275 uc.messages <- msg
276 })
[48]277 case "JOIN":
278 var name string
279 if err := parseMessageParams(msg, &name); err != nil {
280 return err
281 }
282
[55]283 if ch, _ := dc.user.getChannel(name); ch != nil {
[48]284 break // already joined
285 }
286
287 // TODO: extract network name from channel name
288 return ircError{&irc.Message{
289 Command: irc.ERR_NOSUCHCHANNEL,
290 Params: []string{name, "Channel name ambiguous"},
291 }}
[49]292 case "PART":
293 var name string
294 if err := parseMessageParams(msg, &name); err != nil {
295 return err
296 }
297
[55]298 ch, err := dc.user.getChannel(name)
[49]299 if err != nil {
300 return err
301 }
302
303 ch.conn.messages <- msg
304 // TODO: remove channel from upstream config
[46]305 case "MODE":
306 var name string
307 if err := parseMessageParams(msg, &name); err != nil {
308 return err
309 }
310
311 var modeStr string
312 if len(msg.Params) > 1 {
313 modeStr = msg.Params[1]
314 }
315
316 if msg.Prefix.Name != name {
[55]317 ch, err := dc.user.getChannel(name)
[46]318 if err != nil {
319 return err
320 }
321
322 if modeStr != "" {
323 ch.conn.messages <- msg
324 } else {
[55]325 dc.SendMessage(&irc.Message{
326 Prefix: dc.srv.prefix(),
[46]327 Command: irc.RPL_CHANNELMODEIS,
328 Params: []string{ch.Name, string(ch.modes)},
[54]329 })
[46]330 }
331 } else {
[55]332 if name != dc.nick {
[46]333 return ircError{&irc.Message{
334 Command: irc.ERR_USERSDONTMATCH,
[55]335 Params: []string{dc.nick, "Cannot change mode for other users"},
[46]336 }}
337 }
338
339 if modeStr != "" {
[55]340 dc.user.forEachUpstream(func(uc *upstreamConn) {
[46]341 uc.messages <- msg
342 })
343 } else {
[55]344 dc.SendMessage(&irc.Message{
345 Prefix: dc.srv.prefix(),
[46]346 Command: irc.RPL_UMODEIS,
347 Params: []string{""}, // TODO
[54]348 })
[46]349 }
350 }
[13]351 default:
[55]352 dc.logger.Printf("unhandled message: %v", msg)
[13]353 return newUnknownCommandError(msg.Command)
354 }
[42]355 return nil
[13]356}
Note: See TracBrowser for help on using the repository browser.