source: code/trunk/upstream.go@ 92

Last change on this file since 92 was 92, checked in by contact, 5 years ago

Add upstream CAP LS support

File size: 11.0 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
[46]17 conn *upstreamConn
[19]18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
[35]22 modes modeSet
[19]23 Members map[string]membership
[25]24 complete bool
[19]25}
26
[13]27type upstreamConn struct {
[77]28 network *network
[21]29 logger Logger
[19]30 net net.Conn
31 irc *irc.Conn
32 srv *Server
[37]33 user *user
[33]34 messages chan<- *irc.Message
[50]35 ring *Ring
[16]36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
[19]41
42 registered bool
[42]43 nick string
[77]44 username string
45 realname string
[33]46 closed bool
[19]47 modes modeSet
48 channels map[string]*upstreamChannel
[57]49 history map[string]uint64
[92]50 caps map[string]string
[13]51}
52
[77]53func connectToUpstream(network *network) (*upstreamConn, error) {
54 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
[33]55
[77]56 addr := network.Addr
57 if !strings.ContainsRune(addr, ':') {
58 addr = addr + ":6697"
59 }
60
61 logger.Printf("connecting to TLS server at address %q", addr)
62 netConn, err := tls.Dial("tcp", addr, nil)
[33]63 if err != nil {
[77]64 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
[33]65 }
66
[67]67 setKeepAlive(netConn)
68
[33]69 msgs := make(chan *irc.Message, 64)
[55]70 uc := &upstreamConn{
[79]71 network: network,
[33]72 logger: logger,
73 net: netConn,
74 irc: irc.NewConn(netConn),
[77]75 srv: network.user.srv,
76 user: network.user,
[33]77 messages: msgs,
[77]78 ring: NewRing(network.user.srv.RingCap),
[33]79 channels: make(map[string]*upstreamChannel),
[57]80 history: make(map[string]uint64),
[92]81 caps: make(map[string]string),
[33]82 }
83
84 go func() {
85 for msg := range msgs {
[64]86 if uc.srv.Debug {
87 uc.logger.Printf("sent: %v", msg)
88 }
[55]89 if err := uc.irc.WriteMessage(msg); err != nil {
90 uc.logger.Printf("failed to write message: %v", err)
[33]91 }
92 }
[55]93 if err := uc.net.Close(); err != nil {
94 uc.logger.Printf("failed to close connection: %v", err)
[45]95 } else {
[55]96 uc.logger.Printf("connection closed")
[45]97 }
[33]98 }()
99
[55]100 return uc, nil
[33]101}
102
[55]103func (uc *upstreamConn) Close() error {
104 if uc.closed {
[33]105 return fmt.Errorf("upstream connection already closed")
106 }
[55]107 close(uc.messages)
108 uc.closed = true
[33]109 return nil
110}
111
[73]112func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
113 uc.user.forEachDownstream(func(dc *downstreamConn) {
[77]114 if dc.network != nil && dc.network != uc.network {
[73]115 return
116 }
117 f(dc)
118 })
119}
120
[55]121func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
122 ch, ok := uc.channels[name]
[19]123 if !ok {
124 return nil, fmt.Errorf("unknown channel %q", name)
125 }
126 return ch, nil
127}
128
[55]129func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[13]130 switch msg.Command {
131 case "PING":
[60]132 uc.SendMessage(&irc.Message{
[13]133 Command: "PONG",
[68]134 Params: msg.Params,
[60]135 })
[33]136 return nil
[17]137 case "MODE":
[69]138 if msg.Prefix == nil {
139 return fmt.Errorf("missing prefix")
140 }
141
[43]142 var name, modeStr string
143 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
144 return err
[17]145 }
[35]146
147 if name == msg.Prefix.Name { // user mode change
[55]148 if name != uc.nick {
[35]149 return fmt.Errorf("received MODE message for unknow nick %q", name)
150 }
[55]151 return uc.modes.Apply(modeStr)
[35]152 } else { // channel mode change
[55]153 ch, err := uc.getChannel(name)
[35]154 if err != nil {
155 return err
156 }
157 if err := ch.modes.Apply(modeStr); err != nil {
158 return err
159 }
[69]160
[73]161 uc.forEachDownstream(func(dc *downstreamConn) {
[69]162 dc.SendMessage(&irc.Message{
163 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
164 Command: "MODE",
165 Params: []string{dc.marshalChannel(uc, name), modeStr},
166 })
167 })
[46]168 }
[18]169 case "NOTICE":
[55]170 uc.logger.Print(msg)
[92]171 case "CAP":
172 if len(msg.Params) < 2 {
173 return newNeedMoreParamsError(msg.Command)
174 }
175 caps := strings.Fields(msg.Params[len(msg.Params) - 1])
176 more := msg.Params[len(msg.Params) - 2] == "*"
177
178 for _, s := range caps {
179 kv := strings.SplitN(s, "=", 2)
180 k := strings.ToLower(kv[0])
181 var v string
182 if len(kv) >= 2 {
183 v = kv[1]
184 }
185 uc.caps[k] = v
186 }
187
188 if !more {
189 uc.SendMessage(&irc.Message{
190 Command: "CAP",
191 Params: []string{"END"},
192 })
193 }
[14]194 case irc.RPL_WELCOME:
[55]195 uc.registered = true
196 uc.logger.Printf("connection registered")
[19]197
[77]198 channels, err := uc.srv.db.ListChannels(uc.network.ID)
199 if err != nil {
200 uc.logger.Printf("failed to list channels from database: %v", err)
201 break
202 }
203
204 for _, ch := range channels {
[60]205 uc.SendMessage(&irc.Message{
[19]206 Command: "JOIN",
[77]207 Params: []string{ch.Name},
[60]208 })
[19]209 }
[16]210 case irc.RPL_MYINFO:
[55]211 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
[43]212 return err
[16]213 }
214 if len(msg.Params) > 5 {
[55]215 uc.channelModesWithParam = msg.Params[5]
[16]216 }
[42]217 case "NICK":
[83]218 if msg.Prefix == nil {
219 return fmt.Errorf("expected a prefix")
220 }
221
[43]222 var newNick string
223 if err := parseMessageParams(msg, &newNick); err != nil {
224 return err
[42]225 }
226
[55]227 if msg.Prefix.Name == uc.nick {
228 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
229 uc.nick = newNick
[42]230 }
231
[55]232 for _, ch := range uc.channels {
[42]233 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
234 delete(ch.Members, msg.Prefix.Name)
235 ch.Members[newNick] = membership
236 }
237 }
[82]238
239 if msg.Prefix.Name != uc.nick {
240 uc.forEachDownstream(func(dc *downstreamConn) {
241 dc.SendMessage(&irc.Message{
242 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
243 Command: "NICK",
244 Params: []string{newNick},
245 })
246 })
247 }
[69]248 case "JOIN":
249 if msg.Prefix == nil {
250 return fmt.Errorf("expected a prefix")
251 }
[42]252
[43]253 var channels string
254 if err := parseMessageParams(msg, &channels); err != nil {
255 return err
[19]256 }
[34]257
[43]258 for _, ch := range strings.Split(channels, ",") {
[55]259 if msg.Prefix.Name == uc.nick {
260 uc.logger.Printf("joined channel %q", ch)
261 uc.channels[ch] = &upstreamChannel{
[34]262 Name: ch,
[55]263 conn: uc,
[34]264 Members: make(map[string]membership),
265 }
266 } else {
[55]267 ch, err := uc.getChannel(ch)
[34]268 if err != nil {
269 return err
270 }
271 ch.Members[msg.Prefix.Name] = 0
[19]272 }
[69]273
[73]274 uc.forEachDownstream(func(dc *downstreamConn) {
[69]275 dc.SendMessage(&irc.Message{
276 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
277 Command: "JOIN",
278 Params: []string{dc.marshalChannel(uc, ch)},
279 })
280 })
[19]281 }
[69]282 case "PART":
283 if msg.Prefix == nil {
284 return fmt.Errorf("expected a prefix")
285 }
[34]286
[43]287 var channels string
288 if err := parseMessageParams(msg, &channels); err != nil {
289 return err
[34]290 }
291
[43]292 for _, ch := range strings.Split(channels, ",") {
[55]293 if msg.Prefix.Name == uc.nick {
294 uc.logger.Printf("parted channel %q", ch)
295 delete(uc.channels, ch)
[34]296 } else {
[55]297 ch, err := uc.getChannel(ch)
[34]298 if err != nil {
299 return err
300 }
301 delete(ch.Members, msg.Prefix.Name)
302 }
[69]303
[73]304 uc.forEachDownstream(func(dc *downstreamConn) {
[69]305 dc.SendMessage(&irc.Message{
306 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
307 Command: "PART",
308 Params: []string{dc.marshalChannel(uc, ch)},
309 })
310 })
[34]311 }
[83]312 case "QUIT":
313 if msg.Prefix == nil {
314 return fmt.Errorf("expected a prefix")
315 }
316
317 if msg.Prefix.Name == uc.nick {
318 uc.logger.Printf("quit")
319 }
320
321 for _, ch := range uc.channels {
322 delete(ch.Members, msg.Prefix.Name)
323 }
324
325 if msg.Prefix.Name != uc.nick {
326 uc.forEachDownstream(func(dc *downstreamConn) {
327 dc.SendMessage(&irc.Message{
328 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
329 Command: "QUIT",
330 Params: msg.Params,
331 })
332 })
333 }
[19]334 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]335 var name, topic string
336 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
337 return err
[19]338 }
[55]339 ch, err := uc.getChannel(name)
[19]340 if err != nil {
341 return err
342 }
343 if msg.Command == irc.RPL_TOPIC {
[43]344 ch.Topic = topic
[19]345 } else {
346 ch.Topic = ""
347 }
348 case "TOPIC":
[43]349 var name string
[74]350 if err := parseMessageParams(msg, &name); err != nil {
[43]351 return err
[19]352 }
[55]353 ch, err := uc.getChannel(name)
[19]354 if err != nil {
355 return err
356 }
357 if len(msg.Params) > 1 {
358 ch.Topic = msg.Params[1]
359 } else {
360 ch.Topic = ""
361 }
[74]362 uc.forEachDownstream(func(dc *downstreamConn) {
363 params := []string{dc.marshalChannel(uc, name)}
364 if ch.Topic != "" {
365 params = append(params, ch.Topic)
366 }
367 dc.SendMessage(&irc.Message{
368 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
369 Command: "TOPIC",
370 Params: params,
371 })
372 })
[19]373 case rpl_topicwhotime:
[43]374 var name, who, timeStr string
375 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
376 return err
[19]377 }
[55]378 ch, err := uc.getChannel(name)
[19]379 if err != nil {
380 return err
381 }
[43]382 ch.TopicWho = who
383 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]384 if err != nil {
385 return fmt.Errorf("failed to parse topic time: %v", err)
386 }
387 ch.TopicTime = time.Unix(sec, 0)
388 case irc.RPL_NAMREPLY:
[43]389 var name, statusStr, members string
390 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
391 return err
[19]392 }
[55]393 ch, err := uc.getChannel(name)
[19]394 if err != nil {
395 return err
396 }
397
[43]398 status, err := parseChannelStatus(statusStr)
[19]399 if err != nil {
400 return err
401 }
402 ch.Status = status
403
[43]404 for _, s := range strings.Split(members, " ") {
[19]405 membership, nick := parseMembershipPrefix(s)
406 ch.Members[nick] = membership
407 }
408 case irc.RPL_ENDOFNAMES:
[43]409 var name string
410 if err := parseMessageParams(msg, nil, &name); err != nil {
411 return err
[25]412 }
[55]413 ch, err := uc.getChannel(name)
[25]414 if err != nil {
415 return err
416 }
417
[34]418 if ch.complete {
419 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
420 }
[25]421 ch.complete = true
[27]422
[73]423 uc.forEachDownstream(func(dc *downstreamConn) {
[27]424 forwardChannel(dc, ch)
[40]425 })
[36]426 case "PRIVMSG":
[69]427 if err := parseMessageParams(msg, nil, nil); err != nil {
428 return err
429 }
[55]430 uc.ring.Produce(msg)
[16]431 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]432 // Ignore
433 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
434 // Ignore
435 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
436 // Ignore
437 case rpl_localusers, rpl_globalusers:
438 // Ignore
439 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
440 // Ignore
[13]441 default:
[55]442 uc.logger.Printf("unhandled upstream message: %v", msg)
[13]443 }
[14]444 return nil
[13]445}
446
[55]447func (uc *upstreamConn) register() {
[77]448 uc.nick = uc.network.Nick
449 uc.username = uc.network.Username
450 if uc.username == "" {
451 uc.username = uc.nick
452 }
453 uc.realname = uc.network.Realname
454 if uc.realname == "" {
455 uc.realname = uc.nick
456 }
457
[60]458 uc.SendMessage(&irc.Message{
[92]459 Command: "CAP",
460 Params: []string{"LS", "302"},
461 })
462
463 uc.SendMessage(&irc.Message{
[13]464 Command: "NICK",
[69]465 Params: []string{uc.nick},
[60]466 })
467 uc.SendMessage(&irc.Message{
[13]468 Command: "USER",
[77]469 Params: []string{uc.username, "0", "*", uc.realname},
[60]470 })
[44]471}
[13]472
[55]473func (uc *upstreamConn) readMessages() error {
[13]474 for {
[55]475 msg, err := uc.irc.ReadMessage()
[13]476 if err == io.EOF {
477 break
478 } else if err != nil {
479 return fmt.Errorf("failed to read IRC command: %v", err)
480 }
481
[64]482 if uc.srv.Debug {
483 uc.logger.Printf("received: %v", msg)
484 }
485
[55]486 if err := uc.handleMessage(msg); err != nil {
487 uc.logger.Printf("failed to handle message %q: %v", msg, err)
[13]488 }
489 }
490
[45]491 return nil
[13]492}
[60]493
494func (uc *upstreamConn) SendMessage(msg *irc.Message) {
495 uc.messages <- msg
496}
Note: See TracBrowser for help on using the repository browser.