source: code/trunk/upstream.go@ 45

Last change on this file since 45 was 45, checked in by contact, 5 years ago

Close connection from writer goroutine

Connections were being closed from the reader goroutine, causing issues
when sending messages and immediately closing the connection.

File size: 7.7 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
17 Topic string
18 TopicWho string
19 TopicTime time.Time
20 Status channelStatus
[35]21 modes modeSet
[19]22 Members map[string]membership
[25]23 complete bool
[19]24}
25
[13]26type upstreamConn struct {
[19]27 upstream *Upstream
[21]28 logger Logger
[19]29 net net.Conn
30 irc *irc.Conn
31 srv *Server
[37]32 user *user
[33]33 messages chan<- *irc.Message
[16]34
35 serverName string
36 availableUserModes string
37 availableChannelModes string
38 channelModesWithParam string
[19]39
40 registered bool
[42]41 nick string
[33]42 closed bool
[19]43 modes modeSet
44 channels map[string]*upstreamChannel
[13]45}
46
[37]47func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
48 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
[33]49 logger.Printf("connecting to server")
50
51 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
52 if err != nil {
53 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
54 }
55
56 msgs := make(chan *irc.Message, 64)
57 conn := &upstreamConn{
58 upstream: upstream,
59 logger: logger,
60 net: netConn,
61 irc: irc.NewConn(netConn),
[37]62 srv: u.srv,
63 user: u,
[33]64 messages: msgs,
65 channels: make(map[string]*upstreamChannel),
66 }
67
68 go func() {
69 for msg := range msgs {
70 if err := conn.irc.WriteMessage(msg); err != nil {
71 conn.logger.Printf("failed to write message: %v", err)
72 }
73 }
[45]74 if err := conn.net.Close(); err != nil {
75 conn.logger.Printf("failed to close connection: %v", err)
76 } else {
77 conn.logger.Printf("connection closed")
78 }
[33]79 }()
80
81 return conn, nil
82}
83
84func (c *upstreamConn) Close() error {
85 if c.closed {
86 return fmt.Errorf("upstream connection already closed")
87 }
88 close(c.messages)
89 c.closed = true
90 return nil
91}
92
[19]93func (c *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
94 ch, ok := c.channels[name]
95 if !ok {
96 return nil, fmt.Errorf("unknown channel %q", name)
97 }
98 return ch, nil
99}
100
[13]101func (c *upstreamConn) handleMessage(msg *irc.Message) error {
102 switch msg.Command {
103 case "PING":
104 // TODO: handle params
[33]105 c.messages <- &irc.Message{
[13]106 Command: "PONG",
107 Params: []string{c.srv.Hostname},
[33]108 }
109 return nil
[17]110 case "MODE":
[43]111 var name, modeStr string
112 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
113 return err
[17]114 }
[35]115
116 if name == msg.Prefix.Name { // user mode change
[42]117 if name != c.nick {
[35]118 return fmt.Errorf("received MODE message for unknow nick %q", name)
119 }
120 return c.modes.Apply(modeStr)
121 } else { // channel mode change
122 ch, err := c.getChannel(name)
123 if err != nil {
124 return err
125 }
126 if err := ch.modes.Apply(modeStr); err != nil {
127 return err
128 }
129
[40]130 c.user.forEachDownstream(func(dc *downstreamConn) {
[35]131 dc.messages <- msg
[40]132 })
[17]133 }
[18]134 case "NOTICE":
[21]135 c.logger.Print(msg)
[14]136 case irc.RPL_WELCOME:
137 c.registered = true
[21]138 c.logger.Printf("connection registered")
[19]139
140 for _, ch := range c.upstream.Channels {
[33]141 c.messages <- &irc.Message{
[19]142 Command: "JOIN",
143 Params: []string{ch},
144 }
145 }
[16]146 case irc.RPL_MYINFO:
[43]147 if err := parseMessageParams(msg, nil, &c.serverName, nil, &c.availableUserModes, &c.availableChannelModes); err != nil {
148 return err
[16]149 }
150 if len(msg.Params) > 5 {
151 c.channelModesWithParam = msg.Params[5]
152 }
[42]153 case "NICK":
[43]154 var newNick string
155 if err := parseMessageParams(msg, &newNick); err != nil {
156 return err
[42]157 }
158
159 if msg.Prefix.Name == c.nick {
160 c.logger.Printf("changed nick from %q to %q", c.nick, newNick)
161 c.nick = newNick
162 }
163
164 for _, ch := range c.channels {
165 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
166 delete(ch.Members, msg.Prefix.Name)
167 ch.Members[newNick] = membership
168 }
169 }
170
171 c.user.forEachDownstream(func(dc *downstreamConn) {
172 dc.messages <- msg
173 })
[19]174 case "JOIN":
[43]175 var channels string
176 if err := parseMessageParams(msg, &channels); err != nil {
177 return err
[19]178 }
[34]179
[43]180 for _, ch := range strings.Split(channels, ",") {
[42]181 if msg.Prefix.Name == c.nick {
[34]182 c.logger.Printf("joined channel %q", ch)
183 c.channels[ch] = &upstreamChannel{
184 Name: ch,
185 Members: make(map[string]membership),
186 }
187 } else {
188 ch, err := c.getChannel(ch)
189 if err != nil {
190 return err
191 }
192 ch.Members[msg.Prefix.Name] = 0
[19]193 }
194 }
[34]195
[40]196 c.user.forEachDownstream(func(dc *downstreamConn) {
[34]197 dc.messages <- msg
[40]198 })
[34]199 case "PART":
[43]200 var channels string
201 if err := parseMessageParams(msg, &channels); err != nil {
202 return err
[34]203 }
204
[43]205 for _, ch := range strings.Split(channels, ",") {
[42]206 if msg.Prefix.Name == c.nick {
[34]207 c.logger.Printf("parted channel %q", ch)
208 delete(c.channels, ch)
209 } else {
210 ch, err := c.getChannel(ch)
211 if err != nil {
212 return err
213 }
214 delete(ch.Members, msg.Prefix.Name)
215 }
216 }
217
[40]218 c.user.forEachDownstream(func(dc *downstreamConn) {
[34]219 dc.messages <- msg
[40]220 })
[19]221 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]222 var name, topic string
223 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
224 return err
[19]225 }
[43]226 ch, err := c.getChannel(name)
[19]227 if err != nil {
228 return err
229 }
230 if msg.Command == irc.RPL_TOPIC {
[43]231 ch.Topic = topic
[19]232 } else {
233 ch.Topic = ""
234 }
235 case "TOPIC":
[43]236 var name string
237 if err := parseMessageParams(msg, nil, &name); err != nil {
238 return err
[19]239 }
[43]240 ch, err := c.getChannel(name)
[19]241 if err != nil {
242 return err
243 }
244 if len(msg.Params) > 1 {
245 ch.Topic = msg.Params[1]
246 } else {
247 ch.Topic = ""
248 }
249 case rpl_topicwhotime:
[43]250 var name, who, timeStr string
251 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
252 return err
[19]253 }
[43]254 ch, err := c.getChannel(name)
[19]255 if err != nil {
256 return err
257 }
[43]258 ch.TopicWho = who
259 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]260 if err != nil {
261 return fmt.Errorf("failed to parse topic time: %v", err)
262 }
263 ch.TopicTime = time.Unix(sec, 0)
264 case irc.RPL_NAMREPLY:
[43]265 var name, statusStr, members string
266 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
267 return err
[19]268 }
[43]269 ch, err := c.getChannel(name)
[19]270 if err != nil {
271 return err
272 }
273
[43]274 status, err := parseChannelStatus(statusStr)
[19]275 if err != nil {
276 return err
277 }
278 ch.Status = status
279
[43]280 for _, s := range strings.Split(members, " ") {
[19]281 membership, nick := parseMembershipPrefix(s)
282 ch.Members[nick] = membership
283 }
284 case irc.RPL_ENDOFNAMES:
[43]285 var name string
286 if err := parseMessageParams(msg, nil, &name); err != nil {
287 return err
[25]288 }
[43]289 ch, err := c.getChannel(name)
[25]290 if err != nil {
291 return err
292 }
293
[34]294 if ch.complete {
295 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
296 }
[25]297 ch.complete = true
[27]298
[40]299 c.user.forEachDownstream(func(dc *downstreamConn) {
[27]300 forwardChannel(dc, ch)
[40]301 })
[36]302 case "PRIVMSG":
[40]303 c.user.forEachDownstream(func(dc *downstreamConn) {
[36]304 dc.messages <- msg
[40]305 })
[16]306 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]307 // Ignore
308 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
309 // Ignore
310 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
311 // Ignore
312 case rpl_localusers, rpl_globalusers:
313 // Ignore
314 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
315 // Ignore
[13]316 default:
[21]317 c.logger.Printf("unhandled upstream message: %v", msg)
[13]318 }
[14]319 return nil
[13]320}
321
[44]322func (c *upstreamConn) register() {
[42]323 c.nick = c.upstream.Nick
[33]324 c.messages <- &irc.Message{
[13]325 Command: "NICK",
[23]326 Params: []string{c.upstream.Nick},
[13]327 }
[33]328 c.messages <- &irc.Message{
[13]329 Command: "USER",
[23]330 Params: []string{c.upstream.Username, "0", "*", c.upstream.Realname},
[13]331 }
[44]332}
[13]333
[44]334func (c *upstreamConn) readMessages() error {
[13]335 for {
336 msg, err := c.irc.ReadMessage()
337 if err == io.EOF {
338 break
339 } else if err != nil {
340 return fmt.Errorf("failed to read IRC command: %v", err)
341 }
342
343 if err := c.handleMessage(msg); err != nil {
[21]344 c.logger.Printf("failed to handle message %q: %v", msg, err)
[13]345 }
346 }
347
[45]348 return nil
[13]349}
Note: See TracBrowser for help on using the repository browser.