source: code/trunk/upstream.go@ 86

Last change on this file since 86 was 83, checked in by contact, 5 years ago

Pass-through QUIT messages

File size: 10.4 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
[46]17 conn *upstreamConn
[19]18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
[35]22 modes modeSet
[19]23 Members map[string]membership
[25]24 complete bool
[19]25}
26
[13]27type upstreamConn struct {
[77]28 network *network
[21]29 logger Logger
[19]30 net net.Conn
31 irc *irc.Conn
32 srv *Server
[37]33 user *user
[33]34 messages chan<- *irc.Message
[50]35 ring *Ring
[16]36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
[19]41
42 registered bool
[42]43 nick string
[77]44 username string
45 realname string
[33]46 closed bool
[19]47 modes modeSet
48 channels map[string]*upstreamChannel
[57]49 history map[string]uint64
[13]50}
51
[77]52func connectToUpstream(network *network) (*upstreamConn, error) {
53 logger := &prefixLogger{network.user.srv.Logger, fmt.Sprintf("upstream %q: ", network.Addr)}
[33]54
[77]55 addr := network.Addr
56 if !strings.ContainsRune(addr, ':') {
57 addr = addr + ":6697"
58 }
59
60 logger.Printf("connecting to TLS server at address %q", addr)
61 netConn, err := tls.Dial("tcp", addr, nil)
[33]62 if err != nil {
[77]63 return nil, fmt.Errorf("failed to dial %q: %v", addr, err)
[33]64 }
65
[67]66 setKeepAlive(netConn)
67
[33]68 msgs := make(chan *irc.Message, 64)
[55]69 uc := &upstreamConn{
[79]70 network: network,
[33]71 logger: logger,
72 net: netConn,
73 irc: irc.NewConn(netConn),
[77]74 srv: network.user.srv,
75 user: network.user,
[33]76 messages: msgs,
[77]77 ring: NewRing(network.user.srv.RingCap),
[33]78 channels: make(map[string]*upstreamChannel),
[57]79 history: make(map[string]uint64),
[33]80 }
81
82 go func() {
83 for msg := range msgs {
[64]84 if uc.srv.Debug {
85 uc.logger.Printf("sent: %v", msg)
86 }
[55]87 if err := uc.irc.WriteMessage(msg); err != nil {
88 uc.logger.Printf("failed to write message: %v", err)
[33]89 }
90 }
[55]91 if err := uc.net.Close(); err != nil {
92 uc.logger.Printf("failed to close connection: %v", err)
[45]93 } else {
[55]94 uc.logger.Printf("connection closed")
[45]95 }
[33]96 }()
97
[55]98 return uc, nil
[33]99}
100
[55]101func (uc *upstreamConn) Close() error {
102 if uc.closed {
[33]103 return fmt.Errorf("upstream connection already closed")
104 }
[55]105 close(uc.messages)
106 uc.closed = true
[33]107 return nil
108}
109
[73]110func (uc *upstreamConn) forEachDownstream(f func(*downstreamConn)) {
111 uc.user.forEachDownstream(func(dc *downstreamConn) {
[77]112 if dc.network != nil && dc.network != uc.network {
[73]113 return
114 }
115 f(dc)
116 })
117}
118
[55]119func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
120 ch, ok := uc.channels[name]
[19]121 if !ok {
122 return nil, fmt.Errorf("unknown channel %q", name)
123 }
124 return ch, nil
125}
126
[55]127func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[13]128 switch msg.Command {
129 case "PING":
[60]130 uc.SendMessage(&irc.Message{
[13]131 Command: "PONG",
[68]132 Params: msg.Params,
[60]133 })
[33]134 return nil
[17]135 case "MODE":
[69]136 if msg.Prefix == nil {
137 return fmt.Errorf("missing prefix")
138 }
139
[43]140 var name, modeStr string
141 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
142 return err
[17]143 }
[35]144
145 if name == msg.Prefix.Name { // user mode change
[55]146 if name != uc.nick {
[35]147 return fmt.Errorf("received MODE message for unknow nick %q", name)
148 }
[55]149 return uc.modes.Apply(modeStr)
[35]150 } else { // channel mode change
[55]151 ch, err := uc.getChannel(name)
[35]152 if err != nil {
153 return err
154 }
155 if err := ch.modes.Apply(modeStr); err != nil {
156 return err
157 }
[69]158
[73]159 uc.forEachDownstream(func(dc *downstreamConn) {
[69]160 dc.SendMessage(&irc.Message{
161 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
162 Command: "MODE",
163 Params: []string{dc.marshalChannel(uc, name), modeStr},
164 })
165 })
[46]166 }
[18]167 case "NOTICE":
[55]168 uc.logger.Print(msg)
[14]169 case irc.RPL_WELCOME:
[55]170 uc.registered = true
171 uc.logger.Printf("connection registered")
[19]172
[77]173 channels, err := uc.srv.db.ListChannels(uc.network.ID)
174 if err != nil {
175 uc.logger.Printf("failed to list channels from database: %v", err)
176 break
177 }
178
179 for _, ch := range channels {
[60]180 uc.SendMessage(&irc.Message{
[19]181 Command: "JOIN",
[77]182 Params: []string{ch.Name},
[60]183 })
[19]184 }
[16]185 case irc.RPL_MYINFO:
[55]186 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
[43]187 return err
[16]188 }
189 if len(msg.Params) > 5 {
[55]190 uc.channelModesWithParam = msg.Params[5]
[16]191 }
[42]192 case "NICK":
[83]193 if msg.Prefix == nil {
194 return fmt.Errorf("expected a prefix")
195 }
196
[43]197 var newNick string
198 if err := parseMessageParams(msg, &newNick); err != nil {
199 return err
[42]200 }
201
[55]202 if msg.Prefix.Name == uc.nick {
203 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
204 uc.nick = newNick
[42]205 }
206
[55]207 for _, ch := range uc.channels {
[42]208 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
209 delete(ch.Members, msg.Prefix.Name)
210 ch.Members[newNick] = membership
211 }
212 }
[82]213
214 if msg.Prefix.Name != uc.nick {
215 uc.forEachDownstream(func(dc *downstreamConn) {
216 dc.SendMessage(&irc.Message{
217 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
218 Command: "NICK",
219 Params: []string{newNick},
220 })
221 })
222 }
[69]223 case "JOIN":
224 if msg.Prefix == nil {
225 return fmt.Errorf("expected a prefix")
226 }
[42]227
[43]228 var channels string
229 if err := parseMessageParams(msg, &channels); err != nil {
230 return err
[19]231 }
[34]232
[43]233 for _, ch := range strings.Split(channels, ",") {
[55]234 if msg.Prefix.Name == uc.nick {
235 uc.logger.Printf("joined channel %q", ch)
236 uc.channels[ch] = &upstreamChannel{
[34]237 Name: ch,
[55]238 conn: uc,
[34]239 Members: make(map[string]membership),
240 }
241 } else {
[55]242 ch, err := uc.getChannel(ch)
[34]243 if err != nil {
244 return err
245 }
246 ch.Members[msg.Prefix.Name] = 0
[19]247 }
[69]248
[73]249 uc.forEachDownstream(func(dc *downstreamConn) {
[69]250 dc.SendMessage(&irc.Message{
251 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
252 Command: "JOIN",
253 Params: []string{dc.marshalChannel(uc, ch)},
254 })
255 })
[19]256 }
[69]257 case "PART":
258 if msg.Prefix == nil {
259 return fmt.Errorf("expected a prefix")
260 }
[34]261
[43]262 var channels string
263 if err := parseMessageParams(msg, &channels); err != nil {
264 return err
[34]265 }
266
[43]267 for _, ch := range strings.Split(channels, ",") {
[55]268 if msg.Prefix.Name == uc.nick {
269 uc.logger.Printf("parted channel %q", ch)
270 delete(uc.channels, ch)
[34]271 } else {
[55]272 ch, err := uc.getChannel(ch)
[34]273 if err != nil {
274 return err
275 }
276 delete(ch.Members, msg.Prefix.Name)
277 }
[69]278
[73]279 uc.forEachDownstream(func(dc *downstreamConn) {
[69]280 dc.SendMessage(&irc.Message{
281 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
282 Command: "PART",
283 Params: []string{dc.marshalChannel(uc, ch)},
284 })
285 })
[34]286 }
[83]287 case "QUIT":
288 if msg.Prefix == nil {
289 return fmt.Errorf("expected a prefix")
290 }
291
292 if msg.Prefix.Name == uc.nick {
293 uc.logger.Printf("quit")
294 }
295
296 for _, ch := range uc.channels {
297 delete(ch.Members, msg.Prefix.Name)
298 }
299
300 if msg.Prefix.Name != uc.nick {
301 uc.forEachDownstream(func(dc *downstreamConn) {
302 dc.SendMessage(&irc.Message{
303 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
304 Command: "QUIT",
305 Params: msg.Params,
306 })
307 })
308 }
[19]309 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]310 var name, topic string
311 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
312 return err
[19]313 }
[55]314 ch, err := uc.getChannel(name)
[19]315 if err != nil {
316 return err
317 }
318 if msg.Command == irc.RPL_TOPIC {
[43]319 ch.Topic = topic
[19]320 } else {
321 ch.Topic = ""
322 }
323 case "TOPIC":
[43]324 var name string
[74]325 if err := parseMessageParams(msg, &name); err != nil {
[43]326 return err
[19]327 }
[55]328 ch, err := uc.getChannel(name)
[19]329 if err != nil {
330 return err
331 }
332 if len(msg.Params) > 1 {
333 ch.Topic = msg.Params[1]
334 } else {
335 ch.Topic = ""
336 }
[74]337 uc.forEachDownstream(func(dc *downstreamConn) {
338 params := []string{dc.marshalChannel(uc, name)}
339 if ch.Topic != "" {
340 params = append(params, ch.Topic)
341 }
342 dc.SendMessage(&irc.Message{
343 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
344 Command: "TOPIC",
345 Params: params,
346 })
347 })
[19]348 case rpl_topicwhotime:
[43]349 var name, who, timeStr string
350 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
351 return err
[19]352 }
[55]353 ch, err := uc.getChannel(name)
[19]354 if err != nil {
355 return err
356 }
[43]357 ch.TopicWho = who
358 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]359 if err != nil {
360 return fmt.Errorf("failed to parse topic time: %v", err)
361 }
362 ch.TopicTime = time.Unix(sec, 0)
363 case irc.RPL_NAMREPLY:
[43]364 var name, statusStr, members string
365 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
366 return err
[19]367 }
[55]368 ch, err := uc.getChannel(name)
[19]369 if err != nil {
370 return err
371 }
372
[43]373 status, err := parseChannelStatus(statusStr)
[19]374 if err != nil {
375 return err
376 }
377 ch.Status = status
378
[43]379 for _, s := range strings.Split(members, " ") {
[19]380 membership, nick := parseMembershipPrefix(s)
381 ch.Members[nick] = membership
382 }
383 case irc.RPL_ENDOFNAMES:
[43]384 var name string
385 if err := parseMessageParams(msg, nil, &name); err != nil {
386 return err
[25]387 }
[55]388 ch, err := uc.getChannel(name)
[25]389 if err != nil {
390 return err
391 }
392
[34]393 if ch.complete {
394 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
395 }
[25]396 ch.complete = true
[27]397
[73]398 uc.forEachDownstream(func(dc *downstreamConn) {
[27]399 forwardChannel(dc, ch)
[40]400 })
[36]401 case "PRIVMSG":
[69]402 if err := parseMessageParams(msg, nil, nil); err != nil {
403 return err
404 }
[55]405 uc.ring.Produce(msg)
[16]406 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]407 // Ignore
408 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
409 // Ignore
410 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
411 // Ignore
412 case rpl_localusers, rpl_globalusers:
413 // Ignore
414 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
415 // Ignore
[13]416 default:
[55]417 uc.logger.Printf("unhandled upstream message: %v", msg)
[13]418 }
[14]419 return nil
[13]420}
421
[55]422func (uc *upstreamConn) register() {
[77]423 uc.nick = uc.network.Nick
424 uc.username = uc.network.Username
425 if uc.username == "" {
426 uc.username = uc.nick
427 }
428 uc.realname = uc.network.Realname
429 if uc.realname == "" {
430 uc.realname = uc.nick
431 }
432
[60]433 uc.SendMessage(&irc.Message{
[13]434 Command: "NICK",
[69]435 Params: []string{uc.nick},
[60]436 })
437 uc.SendMessage(&irc.Message{
[13]438 Command: "USER",
[77]439 Params: []string{uc.username, "0", "*", uc.realname},
[60]440 })
[44]441}
[13]442
[55]443func (uc *upstreamConn) readMessages() error {
[13]444 for {
[55]445 msg, err := uc.irc.ReadMessage()
[13]446 if err == io.EOF {
447 break
448 } else if err != nil {
449 return fmt.Errorf("failed to read IRC command: %v", err)
450 }
451
[64]452 if uc.srv.Debug {
453 uc.logger.Printf("received: %v", msg)
454 }
455
[55]456 if err := uc.handleMessage(msg); err != nil {
457 uc.logger.Printf("failed to handle message %q: %v", msg, err)
[13]458 }
459 }
460
[45]461 return nil
[13]462}
[60]463
464func (uc *upstreamConn) SendMessage(msg *irc.Message) {
465 uc.messages <- msg
466}
Note: See TracBrowser for help on using the repository browser.