source: code/trunk/upstream.go@ 74

Last change on this file since 74 was 74, checked in by contact, 5 years ago

Fix TOPIC parsing, broadcast to downstream clients

File size: 9.2 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 conn *upstreamConn
18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
22 modes modeSet
23 Members map[string]membership
24 complete bool
25}
26
27type upstreamConn struct {
28 upstream *Upstream
29 logger Logger
30 net net.Conn
31 irc *irc.Conn
32 srv *Server
33 user *user
34 messages chan<- *irc.Message
35 ring *Ring
36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
41
42 registered bool
43 nick string
44 closed bool
45 modes modeSet
46 channels map[string]*upstreamChannel
47 history map[string]uint64
48}
49
50func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
51 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
52 logger.Printf("connecting to server")
53
54 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
55 if err != nil {
56 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
57 }
58
59 setKeepAlive(netConn)
60
61 msgs := make(chan *irc.Message, 64)
62 uc := &upstreamConn{
63 upstream: upstream,
64 logger: logger,
65 net: netConn,
66 irc: irc.NewConn(netConn),
67 srv: u.srv,
68 user: u,
69 messages: msgs,
70 ring: NewRing(u.srv.RingCap),
71 channels: make(map[string]*upstreamChannel),
72 history: make(map[string]uint64),
73 }
74
75 go func() {
76 for msg := range msgs {
77 if uc.srv.Debug {
78 uc.logger.Printf("sent: %v", msg)
79 }
80 if err := uc.irc.WriteMessage(msg); err != nil {
81 uc.logger.Printf("failed to write message: %v", err)
82 }
83 }
84 if err := uc.net.Close(); err != nil {
85 uc.logger.Printf("failed to close connection: %v", err)
86 } else {
87 uc.logger.Printf("connection closed")
88 }
89 }()
90
91 return uc, nil
92}
93
94func (uc *upstreamConn) Close() error {
95 if uc.closed {
96 return fmt.Errorf("upstream connection already closed")
97 }
98 close(uc.messages)
99 uc.closed = true
100 return nil
101}
102
103func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
104 uc.user.forEachDownstream(func(dc *downstreamConn) {
105 if dc.upstream != nil && dc.upstream != uc.upstream {
106 return
107 }
108 f(dc)
109 })
110}
111
112func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
113 ch, ok := uc.channels[name]
114 if !ok {
115 return nil, fmt.Errorf("unknown channel %q", name)
116 }
117 return ch, nil
118}
119
120func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
121 switch msg.Command {
122 case "PING":
123 uc.SendMessage(&irc.Message{
124 Command: "PONG",
125 Params: msg.Params,
126 })
127 return nil
128 case "MODE":
129 if msg.Prefix == nil {
130 return fmt.Errorf("missing prefix")
131 }
132
133 var name, modeStr string
134 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
135 return err
136 }
137
138 if name == msg.Prefix.Name { // user mode change
139 if name != uc.nick {
140 return fmt.Errorf("received MODE message for unknow nick %q", name)
141 }
142 return uc.modes.Apply(modeStr)
143 } else { // channel mode change
144 ch, err := uc.getChannel(name)
145 if err != nil {
146 return err
147 }
148 if err := ch.modes.Apply(modeStr); err != nil {
149 return err
150 }
151
152 uc.forEachDownstream(func(dc *downstreamConn) {
153 dc.SendMessage(&irc.Message{
154 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
155 Command: "MODE",
156 Params: []string{dc.marshalChannel(uc, name), modeStr},
157 })
158 })
159 }
160 case "NOTICE":
161 uc.logger.Print(msg)
162 case irc.RPL_WELCOME:
163 uc.registered = true
164 uc.logger.Printf("connection registered")
165
166 for _, ch := range uc.upstream.Channels {
167 uc.SendMessage(&irc.Message{
168 Command: "JOIN",
169 Params: []string{ch},
170 })
171 }
172 case irc.RPL_MYINFO:
173 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
174 return err
175 }
176 if len(msg.Params) > 5 {
177 uc.channelModesWithParam = msg.Params[5]
178 }
179 case "NICK":
180 var newNick string
181 if err := parseMessageParams(msg, &newNick); err != nil {
182 return err
183 }
184
185 if msg.Prefix.Name == uc.nick {
186 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
187 uc.nick = newNick
188 }
189
190 for _, ch := range uc.channels {
191 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
192 delete(ch.Members, msg.Prefix.Name)
193 ch.Members[newNick] = membership
194 }
195 }
196 case "JOIN":
197 if msg.Prefix == nil {
198 return fmt.Errorf("expected a prefix")
199 }
200
201 var channels string
202 if err := parseMessageParams(msg, &channels); err != nil {
203 return err
204 }
205
206 for _, ch := range strings.Split(channels, ",") {
207 if msg.Prefix.Name == uc.nick {
208 uc.logger.Printf("joined channel %q", ch)
209 uc.channels[ch] = &upstreamChannel{
210 Name: ch,
211 conn: uc,
212 Members: make(map[string]membership),
213 }
214 } else {
215 ch, err := uc.getChannel(ch)
216 if err != nil {
217 return err
218 }
219 ch.Members[msg.Prefix.Name] = 0
220 }
221
222 uc.forEachDownstream(func(dc *downstreamConn) {
223 dc.SendMessage(&irc.Message{
224 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
225 Command: "JOIN",
226 Params: []string{dc.marshalChannel(uc, ch)},
227 })
228 })
229 }
230 case "PART":
231 if msg.Prefix == nil {
232 return fmt.Errorf("expected a prefix")
233 }
234
235 var channels string
236 if err := parseMessageParams(msg, &channels); err != nil {
237 return err
238 }
239
240 for _, ch := range strings.Split(channels, ",") {
241 if msg.Prefix.Name == uc.nick {
242 uc.logger.Printf("parted channel %q", ch)
243 delete(uc.channels, ch)
244 } else {
245 ch, err := uc.getChannel(ch)
246 if err != nil {
247 return err
248 }
249 delete(ch.Members, msg.Prefix.Name)
250 }
251
252 uc.forEachDownstream(func(dc *downstreamConn) {
253 dc.SendMessage(&irc.Message{
254 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
255 Command: "PART",
256 Params: []string{dc.marshalChannel(uc, ch)},
257 })
258 })
259 }
260 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
261 var name, topic string
262 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
263 return err
264 }
265 ch, err := uc.getChannel(name)
266 if err != nil {
267 return err
268 }
269 if msg.Command == irc.RPL_TOPIC {
270 ch.Topic = topic
271 } else {
272 ch.Topic = ""
273 }
274 case "TOPIC":
275 var name string
276 if err := parseMessageParams(msg, &name); err != nil {
277 return err
278 }
279 ch, err := uc.getChannel(name)
280 if err != nil {
281 return err
282 }
283 if len(msg.Params) > 1 {
284 ch.Topic = msg.Params[1]
285 } else {
286 ch.Topic = ""
287 }
288 uc.forEachDownstream(func(dc *downstreamConn) {
289 params := []string{dc.marshalChannel(uc, name)}
290 if ch.Topic != "" {
291 params = append(params, ch.Topic)
292 }
293 dc.SendMessage(&irc.Message{
294 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
295 Command: "TOPIC",
296 Params: params,
297 })
298 })
299 case rpl_topicwhotime:
300 var name, who, timeStr string
301 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
302 return err
303 }
304 ch, err := uc.getChannel(name)
305 if err != nil {
306 return err
307 }
308 ch.TopicWho = who
309 sec, err := strconv.ParseInt(timeStr, 10, 64)
310 if err != nil {
311 return fmt.Errorf("failed to parse topic time: %v", err)
312 }
313 ch.TopicTime = time.Unix(sec, 0)
314 case irc.RPL_NAMREPLY:
315 var name, statusStr, members string
316 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
317 return err
318 }
319 ch, err := uc.getChannel(name)
320 if err != nil {
321 return err
322 }
323
324 status, err := parseChannelStatus(statusStr)
325 if err != nil {
326 return err
327 }
328 ch.Status = status
329
330 for _, s := range strings.Split(members, " ") {
331 membership, nick := parseMembershipPrefix(s)
332 ch.Members[nick] = membership
333 }
334 case irc.RPL_ENDOFNAMES:
335 var name string
336 if err := parseMessageParams(msg, nil, &name); err != nil {
337 return err
338 }
339 ch, err := uc.getChannel(name)
340 if err != nil {
341 return err
342 }
343
344 if ch.complete {
345 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
346 }
347 ch.complete = true
348
349 uc.forEachDownstream(func(dc *downstreamConn) {
350 forwardChannel(dc, ch)
351 })
352 case "PRIVMSG":
353 if err := parseMessageParams(msg, nil, nil); err != nil {
354 return err
355 }
356 uc.ring.Produce(msg)
357 case irc.RPL_YOURHOST, irc.RPL_CREATED:
358 // Ignore
359 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
360 // Ignore
361 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
362 // Ignore
363 case rpl_localusers, rpl_globalusers:
364 // Ignore
365 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
366 // Ignore
367 default:
368 uc.logger.Printf("unhandled upstream message: %v", msg)
369 }
370 return nil
371}
372
373func (uc *upstreamConn) register() {
374 uc.nick = uc.upstream.Nick
375 uc.SendMessage(&irc.Message{
376 Command: "NICK",
377 Params: []string{uc.nick},
378 })
379 uc.SendMessage(&irc.Message{
380 Command: "USER",
381 Params: []string{uc.upstream.Username, "0", "*", uc.upstream.Realname},
382 })
383}
384
385func (uc *upstreamConn) readMessages() error {
386 for {
387 msg, err := uc.irc.ReadMessage()
388 if err == io.EOF {
389 break
390 } else if err != nil {
391 return fmt.Errorf("failed to read IRC command: %v", err)
392 }
393
394 if uc.srv.Debug {
395 uc.logger.Printf("received: %v", msg)
396 }
397
398 if err := uc.handleMessage(msg); err != nil {
399 uc.logger.Printf("failed to handle message %q: %v", msg, err)
400 }
401 }
402
403 return nil
404}
405
406func (uc *upstreamConn) SendMessage(msg *irc.Message) {
407 uc.messages <- msg
408}
Note: See TracBrowser for help on using the repository browser.