source: code/trunk/upstream.go@ 93

Last change on this file since 93 was 93, checked in by contact, 5 years ago

Add support for upstream PASS command

File size: 11.1 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 conn *upstreamConn
18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
22 modes modeSet
23 Members map[string]membership
24 complete bool
25}
26
27type upstreamConn struct {
28 network *network
29 logger Logger
30 net net.Conn
31 irc *irc.Conn
32 srv *Server
33 user *user
34 messages chan<- *irc.Message
35 ring *Ring
36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
41
42 registered bool
43 nick string
44 username string
45 realname string
46 closed bool
47 modes modeSet
48 channels map[string]*upstreamChannel
49 history map[string]uint64
50 caps map[string]string
51}
52
53func connectToUpstream(network *network) (*upstreamConn, error) {
54 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
55
56 addr := network.Addr
57 if !strings.ContainsRune(addr, ':') {
58 addr = addr + ":6697"
59 }
60
61 logger.Printf("connecting to TLS server at address %q", addr)
62 netConn, err := tls.Dial("tcp", addr, nil)
63 if err != nil {
64 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
65 }
66
67 setKeepAlive(netConn)
68
69 msgs := make(chan *irc.Message, 64)
70 uc := &upstreamConn{
71 network: network,
72 logger: logger,
73 net: netConn,
74 irc: irc.NewConn(netConn),
75 srv: network.user.srv,
76 user: network.user,
77 messages: msgs,
78 ring: NewRing(network.user.srv.RingCap),
79 channels: make(map[string]*upstreamChannel),
80 history: make(map[string]uint64),
81 caps: make(map[string]string),
82 }
83
84 go func() {
85 for msg := range msgs {
86 if uc.srv.Debug {
87 uc.logger.Printf("sent: %v", msg)
88 }
89 if err := uc.irc.WriteMessage(msg); err != nil {
90 uc.logger.Printf("failed to write message: %v", err)
91 }
92 }
93 if err := uc.net.Close(); err != nil {
94 uc.logger.Printf("failed to close connection: %v", err)
95 } else {
96 uc.logger.Printf("connection closed")
97 }
98 }()
99
100 return uc, nil
101}
102
103func (uc *upstreamConn) Close() error {
104 if uc.closed {
105 return fmt.Errorf("upstream connection already closed")
106 }
107 close(uc.messages)
108 uc.closed = true
109 return nil
110}
111
112func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
113 uc.user.forEachDownstream(func(dc *downstreamConn) {
114 if dc.network != nil && dc.network != uc.network {
115 return
116 }
117 f(dc)
118 })
119}
120
121func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
122 ch, ok := uc.channels[name]
123 if !ok {
124 return nil, fmt.Errorf("unknown channel %q", name)
125 }
126 return ch, nil
127}
128
129func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
130 switch msg.Command {
131 case "PING":
132 uc.SendMessage(&irc.Message{
133 Command: "PONG",
134 Params: msg.Params,
135 })
136 return nil
137 case "MODE":
138 if msg.Prefix == nil {
139 return fmt.Errorf("missing prefix")
140 }
141
142 var name, modeStr string
143 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
144 return err
145 }
146
147 if name == msg.Prefix.Name { // user mode change
148 if name != uc.nick {
149 return fmt.Errorf("received MODE message for unknow nick %q", name)
150 }
151 return uc.modes.Apply(modeStr)
152 } else { // channel mode change
153 ch, err := uc.getChannel(name)
154 if err != nil {
155 return err
156 }
157 if err := ch.modes.Apply(modeStr); err != nil {
158 return err
159 }
160
161 uc.forEachDownstream(func(dc *downstreamConn) {
162 dc.SendMessage(&irc.Message{
163 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
164 Command: "MODE",
165 Params: []string{dc.marshalChannel(uc, name), modeStr},
166 })
167 })
168 }
169 case "NOTICE":
170 uc.logger.Print(msg)
171 case "CAP":
172 if len(msg.Params) < 2 {
173 return newNeedMoreParamsError(msg.Command)
174 }
175 caps := strings.Fields(msg.Params[len(msg.Params)-1])
176 more := msg.Params[len(msg.Params)-2] == "*"
177
178 for _, s := range caps {
179 kv := strings.SplitN(s, "=", 2)
180 k := strings.ToLower(kv[0])
181 var v string
182 if len(kv) >= 2 {
183 v = kv[1]
184 }
185 uc.caps[k] = v
186 }
187
188 if !more {
189 uc.SendMessage(&irc.Message{
190 Command: "CAP",
191 Params: []string{"END"},
192 })
193 }
194 case irc.RPL_WELCOME:
195 uc.registered = true
196 uc.logger.Printf("connection registered")
197
198 channels, err := uc.srv.db.ListChannels(uc.network.ID)
199 if err != nil {
200 uc.logger.Printf("failed to list channels from database: %v", err)
201 break
202 }
203
204 for _, ch := range channels {
205 uc.SendMessage(&irc.Message{
206 Command: "JOIN",
207 Params: []string{ch.Name},
208 })
209 }
210 case irc.RPL_MYINFO:
211 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
212 return err
213 }
214 if len(msg.Params) > 5 {
215 uc.channelModesWithParam = msg.Params[5]
216 }
217 case "NICK":
218 if msg.Prefix == nil {
219 return fmt.Errorf("expected a prefix")
220 }
221
222 var newNick string
223 if err := parseMessageParams(msg, &newNick); err != nil {
224 return err
225 }
226
227 if msg.Prefix.Name == uc.nick {
228 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
229 uc.nick = newNick
230 }
231
232 for _, ch := range uc.channels {
233 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
234 delete(ch.Members, msg.Prefix.Name)
235 ch.Members[newNick] = membership
236 }
237 }
238
239 if msg.Prefix.Name != uc.nick {
240 uc.forEachDownstream(func(dc *downstreamConn) {
241 dc.SendMessage(&irc.Message{
242 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
243 Command: "NICK",
244 Params: []string{newNick},
245 })
246 })
247 }
248 case "JOIN":
249 if msg.Prefix == nil {
250 return fmt.Errorf("expected a prefix")
251 }
252
253 var channels string
254 if err := parseMessageParams(msg, &channels); err != nil {
255 return err
256 }
257
258 for _, ch := range strings.Split(channels, ",") {
259 if msg.Prefix.Name == uc.nick {
260 uc.logger.Printf("joined channel %q", ch)
261 uc.channels[ch] = &upstreamChannel{
262 Name: ch,
263 conn: uc,
264 Members: make(map[string]membership),
265 }
266 } else {
267 ch, err := uc.getChannel(ch)
268 if err != nil {
269 return err
270 }
271 ch.Members[msg.Prefix.Name] = 0
272 }
273
274 uc.forEachDownstream(func(dc *downstreamConn) {
275 dc.SendMessage(&irc.Message{
276 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
277 Command: "JOIN",
278 Params: []string{dc.marshalChannel(uc, ch)},
279 })
280 })
281 }
282 case "PART":
283 if msg.Prefix == nil {
284 return fmt.Errorf("expected a prefix")
285 }
286
287 var channels string
288 if err := parseMessageParams(msg, &channels); err != nil {
289 return err
290 }
291
292 for _, ch := range strings.Split(channels, ",") {
293 if msg.Prefix.Name == uc.nick {
294 uc.logger.Printf("parted channel %q", ch)
295 delete(uc.channels, ch)
296 } else {
297 ch, err := uc.getChannel(ch)
298 if err != nil {
299 return err
300 }
301 delete(ch.Members, msg.Prefix.Name)
302 }
303
304 uc.forEachDownstream(func(dc *downstreamConn) {
305 dc.SendMessage(&irc.Message{
306 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
307 Command: "PART",
308 Params: []string{dc.marshalChannel(uc, ch)},
309 })
310 })
311 }
312 case "QUIT":
313 if msg.Prefix == nil {
314 return fmt.Errorf("expected a prefix")
315 }
316
317 if msg.Prefix.Name == uc.nick {
318 uc.logger.Printf("quit")
319 }
320
321 for _, ch := range uc.channels {
322 delete(ch.Members, msg.Prefix.Name)
323 }
324
325 if msg.Prefix.Name != uc.nick {
326 uc.forEachDownstream(func(dc *downstreamConn) {
327 dc.SendMessage(&irc.Message{
328 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
329 Command: "QUIT",
330 Params: msg.Params,
331 })
332 })
333 }
334 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
335 var name, topic string
336 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
337 return err
338 }
339 ch, err := uc.getChannel(name)
340 if err != nil {
341 return err
342 }
343 if msg.Command == irc.RPL_TOPIC {
344 ch.Topic = topic
345 } else {
346 ch.Topic = ""
347 }
348 case "TOPIC":
349 var name string
350 if err := parseMessageParams(msg, &name); err != nil {
351 return err
352 }
353 ch, err := uc.getChannel(name)
354 if err != nil {
355 return err
356 }
357 if len(msg.Params) > 1 {
358 ch.Topic = msg.Params[1]
359 } else {
360 ch.Topic = ""
361 }
362 uc.forEachDownstream(func(dc *downstreamConn) {
363 params := []string{dc.marshalChannel(uc, name)}
364 if ch.Topic != "" {
365 params = append(params, ch.Topic)
366 }
367 dc.SendMessage(&irc.Message{
368 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
369 Command: "TOPIC",
370 Params: params,
371 })
372 })
373 case rpl_topicwhotime:
374 var name, who, timeStr string
375 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
376 return err
377 }
378 ch, err := uc.getChannel(name)
379 if err != nil {
380 return err
381 }
382 ch.TopicWho = who
383 sec, err := strconv.ParseInt(timeStr, 10, 64)
384 if err != nil {
385 return fmt.Errorf("failed to parse topic time: %v", err)
386 }
387 ch.TopicTime = time.Unix(sec, 0)
388 case irc.RPL_NAMREPLY:
389 var name, statusStr, members string
390 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
391 return err
392 }
393 ch, err := uc.getChannel(name)
394 if err != nil {
395 return err
396 }
397
398 status, err := parseChannelStatus(statusStr)
399 if err != nil {
400 return err
401 }
402 ch.Status = status
403
404 for _, s := range strings.Split(members, " ") {
405 membership, nick := parseMembershipPrefix(s)
406 ch.Members[nick] = membership
407 }
408 case irc.RPL_ENDOFNAMES:
409 var name string
410 if err := parseMessageParams(msg, nil, &name); err != nil {
411 return err
412 }
413 ch, err := uc.getChannel(name)
414 if err != nil {
415 return err
416 }
417
418 if ch.complete {
419 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
420 }
421 ch.complete = true
422
423 uc.forEachDownstream(func(dc *downstreamConn) {
424 forwardChannel(dc, ch)
425 })
426 case "PRIVMSG":
427 if err := parseMessageParams(msg, nil, nil); err != nil {
428 return err
429 }
430 uc.ring.Produce(msg)
431 case irc.RPL_YOURHOST, irc.RPL_CREATED:
432 // Ignore
433 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
434 // Ignore
435 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
436 // Ignore
437 case rpl_localusers, rpl_globalusers:
438 // Ignore
439 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
440 // Ignore
441 default:
442 uc.logger.Printf("unhandled upstream message: %v", msg)
443 }
444 return nil
445}
446
447func (uc *upstreamConn) register() {
448 uc.nick = uc.network.Nick
449 uc.username = uc.network.Username
450 if uc.username == "" {
451 uc.username = uc.nick
452 }
453 uc.realname = uc.network.Realname
454 if uc.realname == "" {
455 uc.realname = uc.nick
456 }
457
458 uc.SendMessage(&irc.Message{
459 Command: "CAP",
460 Params: []string{"LS", "302"},
461 })
462
463 if uc.network.Pass != "" {
464 uc.SendMessage(&irc.Message{
465 Command: "PASS",
466 Params: []string{uc.network.Pass},
467 })
468 }
469
470 uc.SendMessage(&irc.Message{
471 Command: "NICK",
472 Params: []string{uc.nick},
473 })
474 uc.SendMessage(&irc.Message{
475 Command: "USER",
476 Params: []string{uc.username, "0", "*", uc.realname},
477 })
478}
479
480func (uc *upstreamConn) readMessages() error {
481 for {
482 msg, err := uc.irc.ReadMessage()
483 if err == io.EOF {
484 break
485 } else if err != nil {
486 return fmt.Errorf("failed to read IRC command: %v", err)
487 }
488
489 if uc.srv.Debug {
490 uc.logger.Printf("received: %v", msg)
491 }
492
493 if err := uc.handleMessage(msg); err != nil {
494 uc.logger.Printf("failed to handle message %q: %v", msg, err)
495 }
496 }
497
498 return nil
499}
500
501func (uc *upstreamConn) SendMessage(msg *irc.Message) {
502 uc.messages <- msg
503}
Note: See TracBrowser for help on using the repository browser.