source: code/trunk/upstream.go@ 91

Last change on this file since 91 was 83, checked in by contact, 5 years ago

Pass-through QUIT messages

File size: 10.4 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 conn *upstreamConn
18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
22 modes modeSet
23 Members map[string]membership
24 complete bool
25}
26
27type upstreamConn struct {
28 network *network
29 logger Logger
30 net net.Conn
31 irc *irc.Conn
32 srv *Server
33 user *user
34 messages chan<- *irc.Message
35 ring *Ring
36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
41
42 registered bool
43 nick string
44 username string
45 realname string
46 closed bool
47 modes modeSet
48 channels map[string]*upstreamChannel
49 history map[string]uint64
50}
51
52func connectToUpstream(network *network) (*upstreamConn, error) {
53 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
54
55 addr := network.Addr
56 if !strings.ContainsRune(addr, ':') {
57 addr = addr + ":6697"
58 }
59
60 logger.Printf("connecting to TLS server at address %q", addr)
61 netConn, err := tls.Dial("tcp", addr, nil)
62 if err != nil {
63 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
64 }
65
66 setKeepAlive(netConn)
67
68 msgs := make(chan *irc.Message, 64)
69 uc := &upstreamConn{
70 network: network,
71 logger: logger,
72 net: netConn,
73 irc: irc.NewConn(netConn),
74 srv: network.user.srv,
75 user: network.user,
76 messages: msgs,
77 ring: NewRing(network.user.srv.RingCap),
78 channels: make(map[string]*upstreamChannel),
79 history: make(map[string]uint64),
80 }
81
82 go func() {
83 for msg := range msgs {
84 if uc.srv.Debug {
85 uc.logger.Printf("sent: %v", msg)
86 }
87 if err := uc.irc.WriteMessage(msg); err != nil {
88 uc.logger.Printf("failed to write message: %v", err)
89 }
90 }
91 if err := uc.net.Close(); err != nil {
92 uc.logger.Printf("failed to close connection: %v", err)
93 } else {
94 uc.logger.Printf("connection closed")
95 }
96 }()
97
98 return uc, nil
99}
100
101func (uc *upstreamConn) Close() error {
102 if uc.closed {
103 return fmt.Errorf("upstream connection already closed")
104 }
105 close(uc.messages)
106 uc.closed = true
107 return nil
108}
109
110func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
111 uc.user.forEachDownstream(func(dc *downstreamConn) {
112 if dc.network != nil && dc.network != uc.network {
113 return
114 }
115 f(dc)
116 })
117}
118
119func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
120 ch, ok := uc.channels[name]
121 if !ok {
122 return nil, fmt.Errorf("unknown channel %q", name)
123 }
124 return ch, nil
125}
126
127func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
128 switch msg.Command {
129 case "PING":
130 uc.SendMessage(&irc.Message{
131 Command: "PONG",
132 Params: msg.Params,
133 })
134 return nil
135 case "MODE":
136 if msg.Prefix == nil {
137 return fmt.Errorf("missing prefix")
138 }
139
140 var name, modeStr string
141 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
142 return err
143 }
144
145 if name == msg.Prefix.Name { // user mode change
146 if name != uc.nick {
147 return fmt.Errorf("received MODE message for unknow nick %q", name)
148 }
149 return uc.modes.Apply(modeStr)
150 } else { // channel mode change
151 ch, err := uc.getChannel(name)
152 if err != nil {
153 return err
154 }
155 if err := ch.modes.Apply(modeStr); err != nil {
156 return err
157 }
158
159 uc.forEachDownstream(func(dc *downstreamConn) {
160 dc.SendMessage(&irc.Message{
161 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
162 Command: "MODE",
163 Params: []string{dc.marshalChannel(uc, name), modeStr},
164 })
165 })
166 }
167 case "NOTICE":
168 uc.logger.Print(msg)
169 case irc.RPL_WELCOME:
170 uc.registered = true
171 uc.logger.Printf("connection registered")
172
173 channels, err := uc.srv.db.ListChannels(uc.network.ID)
174 if err != nil {
175 uc.logger.Printf("failed to list channels from database: %v", err)
176 break
177 }
178
179 for _, ch := range channels {
180 uc.SendMessage(&irc.Message{
181 Command: "JOIN",
182 Params: []string{ch.Name},
183 })
184 }
185 case irc.RPL_MYINFO:
186 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
187 return err
188 }
189 if len(msg.Params) > 5 {
190 uc.channelModesWithParam = msg.Params[5]
191 }
192 case "NICK":
193 if msg.Prefix == nil {
194 return fmt.Errorf("expected a prefix")
195 }
196
197 var newNick string
198 if err := parseMessageParams(msg, &newNick); err != nil {
199 return err
200 }
201
202 if msg.Prefix.Name == uc.nick {
203 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
204 uc.nick = newNick
205 }
206
207 for _, ch := range uc.channels {
208 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
209 delete(ch.Members, msg.Prefix.Name)
210 ch.Members[newNick] = membership
211 }
212 }
213
214 if msg.Prefix.Name != uc.nick {
215 uc.forEachDownstream(func(dc *downstreamConn) {
216 dc.SendMessage(&irc.Message{
217 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
218 Command: "NICK",
219 Params: []string{newNick},
220 })
221 })
222 }
223 case "JOIN":
224 if msg.Prefix == nil {
225 return fmt.Errorf("expected a prefix")
226 }
227
228 var channels string
229 if err := parseMessageParams(msg, &channels); err != nil {
230 return err
231 }
232
233 for _, ch := range strings.Split(channels, ",") {
234 if msg.Prefix.Name == uc.nick {
235 uc.logger.Printf("joined channel %q", ch)
236 uc.channels[ch] = &upstreamChannel{
237 Name: ch,
238 conn: uc,
239 Members: make(map[string]membership),
240 }
241 } else {
242 ch, err := uc.getChannel(ch)
243 if err != nil {
244 return err
245 }
246 ch.Members[msg.Prefix.Name] = 0
247 }
248
249 uc.forEachDownstream(func(dc *downstreamConn) {
250 dc.SendMessage(&irc.Message{
251 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
252 Command: "JOIN",
253 Params: []string{dc.marshalChannel(uc, ch)},
254 })
255 })
256 }
257 case "PART":
258 if msg.Prefix == nil {
259 return fmt.Errorf("expected a prefix")
260 }
261
262 var channels string
263 if err := parseMessageParams(msg, &channels); err != nil {
264 return err
265 }
266
267 for _, ch := range strings.Split(channels, ",") {
268 if msg.Prefix.Name == uc.nick {
269 uc.logger.Printf("parted channel %q", ch)
270 delete(uc.channels, ch)
271 } else {
272 ch, err := uc.getChannel(ch)
273 if err != nil {
274 return err
275 }
276 delete(ch.Members, msg.Prefix.Name)
277 }
278
279 uc.forEachDownstream(func(dc *downstreamConn) {
280 dc.SendMessage(&irc.Message{
281 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
282 Command: "PART",
283 Params: []string{dc.marshalChannel(uc, ch)},
284 })
285 })
286 }
287 case "QUIT":
288 if msg.Prefix == nil {
289 return fmt.Errorf("expected a prefix")
290 }
291
292 if msg.Prefix.Name == uc.nick {
293 uc.logger.Printf("quit")
294 }
295
296 for _, ch := range uc.channels {
297 delete(ch.Members, msg.Prefix.Name)
298 }
299
300 if msg.Prefix.Name != uc.nick {
301 uc.forEachDownstream(func(dc *downstreamConn) {
302 dc.SendMessage(&irc.Message{
303 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
304 Command: "QUIT",
305 Params: msg.Params,
306 })
307 })
308 }
309 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
310 var name, topic string
311 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
312 return err
313 }
314 ch, err := uc.getChannel(name)
315 if err != nil {
316 return err
317 }
318 if msg.Command == irc.RPL_TOPIC {
319 ch.Topic = topic
320 } else {
321 ch.Topic = ""
322 }
323 case "TOPIC":
324 var name string
325 if err := parseMessageParams(msg, &name); err != nil {
326 return err
327 }
328 ch, err := uc.getChannel(name)
329 if err != nil {
330 return err
331 }
332 if len(msg.Params) > 1 {
333 ch.Topic = msg.Params[1]
334 } else {
335 ch.Topic = ""
336 }
337 uc.forEachDownstream(func(dc *downstreamConn) {
338 params := []string{dc.marshalChannel(uc, name)}
339 if ch.Topic != "" {
340 params = append(params, ch.Topic)
341 }
342 dc.SendMessage(&irc.Message{
343 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
344 Command: "TOPIC",
345 Params: params,
346 })
347 })
348 case rpl_topicwhotime:
349 var name, who, timeStr string
350 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
351 return err
352 }
353 ch, err := uc.getChannel(name)
354 if err != nil {
355 return err
356 }
357 ch.TopicWho = who
358 sec, err := strconv.ParseInt(timeStr, 10, 64)
359 if err != nil {
360 return fmt.Errorf("failed to parse topic time: %v", err)
361 }
362 ch.TopicTime = time.Unix(sec, 0)
363 case irc.RPL_NAMREPLY:
364 var name, statusStr, members string
365 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
366 return err
367 }
368 ch, err := uc.getChannel(name)
369 if err != nil {
370 return err
371 }
372
373 status, err := parseChannelStatus(statusStr)
374 if err != nil {
375 return err
376 }
377 ch.Status = status
378
379 for _, s := range strings.Split(members, " ") {
380 membership, nick := parseMembershipPrefix(s)
381 ch.Members[nick] = membership
382 }
383 case irc.RPL_ENDOFNAMES:
384 var name string
385 if err := parseMessageParams(msg, nil, &name); err != nil {
386 return err
387 }
388 ch, err := uc.getChannel(name)
389 if err != nil {
390 return err
391 }
392
393 if ch.complete {
394 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
395 }
396 ch.complete = true
397
398 uc.forEachDownstream(func(dc *downstreamConn) {
399 forwardChannel(dc, ch)
400 })
401 case "PRIVMSG":
402 if err := parseMessageParams(msg, nil, nil); err != nil {
403 return err
404 }
405 uc.ring.Produce(msg)
406 case irc.RPL_YOURHOST, irc.RPL_CREATED:
407 // Ignore
408 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
409 // Ignore
410 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
411 // Ignore
412 case rpl_localusers, rpl_globalusers:
413 // Ignore
414 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
415 // Ignore
416 default:
417 uc.logger.Printf("unhandled upstream message: %v", msg)
418 }
419 return nil
420}
421
422func (uc *upstreamConn) register() {
423 uc.nick = uc.network.Nick
424 uc.username = uc.network.Username
425 if uc.username == "" {
426 uc.username = uc.nick
427 }
428 uc.realname = uc.network.Realname
429 if uc.realname == "" {
430 uc.realname = uc.nick
431 }
432
433 uc.SendMessage(&irc.Message{
434 Command: "NICK",
435 Params: []string{uc.nick},
436 })
437 uc.SendMessage(&irc.Message{
438 Command: "USER",
439 Params: []string{uc.username, "0", "*", uc.realname},
440 })
441}
442
443func (uc *upstreamConn) readMessages() error {
444 for {
445 msg, err := uc.irc.ReadMessage()
446 if err == io.EOF {
447 break
448 } else if err != nil {
449 return fmt.Errorf("failed to read IRC command: %v", err)
450 }
451
452 if uc.srv.Debug {
453 uc.logger.Printf("received: %v", msg)
454 }
455
456 if err := uc.handleMessage(msg); err != nil {
457 uc.logger.Printf("failed to handle message %q: %v", msg, err)
458 }
459 }
460
461 return nil
462}
463
464func (uc *upstreamConn) SendMessage(msg *irc.Message) {
465 uc.messages <- msg
466}
Note: See TracBrowser for help on using the repository browser.