source: code/trunk/upstream.go@ 66

Last change on this file since 66 was 66, checked in by contact, 5 years ago

Properly handle PING messages

File size: 8.4 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
[46]17 conn *upstreamConn
[19]18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
[35]22 modes modeSet
[19]23 Members map[string]membership
[25]24 complete bool
[19]25}
26
[13]27type upstreamConn struct {
[19]28 upstream *Upstream
[21]29 logger Logger
[19]30 net net.Conn
31 irc *irc.Conn
32 srv *Server
[37]33 user *user
[33]34 messages chan<- *irc.Message
[50]35 ring *Ring
[16]36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
[19]41
42 registered bool
[42]43 nick string
[33]44 closed bool
[19]45 modes modeSet
46 channels map[string]*upstreamChannel
[57]47 history map[string]uint64
[13]48}
49
[37]50func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
51 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
[33]52 logger.Printf("connecting to server")
53
54 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
55 if err != nil {
56 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
57 }
58
59 msgs := make(chan *irc.Message, 64)
[55]60 uc := &upstreamConn{
[33]61 upstream: upstream,
62 logger: logger,
63 net: netConn,
64 irc: irc.NewConn(netConn),
[37]65 srv: u.srv,
66 user: u,
[33]67 messages: msgs,
[50]68 ring: NewRing(u.srv.RingCap),
[33]69 channels: make(map[string]*upstreamChannel),
[57]70 history: make(map[string]uint64),
[33]71 }
72
73 go func() {
74 for msg := range msgs {
[64]75 if uc.srv.Debug {
76 uc.logger.Printf("sent: %v", msg)
77 }
[55]78 if err := uc.irc.WriteMessage(msg); err != nil {
79 uc.logger.Printf("failed to write message: %v", err)
[33]80 }
81 }
[55]82 if err := uc.net.Close(); err != nil {
83 uc.logger.Printf("failed to close connection: %v", err)
[45]84 } else {
[55]85 uc.logger.Printf("connection closed")
[45]86 }
[33]87 }()
88
[55]89 return uc, nil
[33]90}
91
[55]92func (uc *upstreamConn) Close() error {
93 if uc.closed {
[33]94 return fmt.Errorf("upstream connection already closed")
95 }
[55]96 close(uc.messages)
97 uc.closed = true
[33]98 return nil
99}
100
[55]101func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
102 ch, ok := uc.channels[name]
[19]103 if !ok {
104 return nil, fmt.Errorf("unknown channel %q", name)
105 }
106 return ch, nil
107}
108
[55]109func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[13]110 switch msg.Command {
111 case "PING":
[66]112 var from, to string
113 if len(msg.Params) >= 1 {
114 from = msg.Params[0]
115 }
116 if len(msg.Params) >= 2 {
117 to = msg.Params[1]
118 }
119
120 if to != "" && to != uc.srv.Hostname {
121 return fmt.Errorf("invalid PING destination %q", to)
122 }
123
124 params := []string{uc.srv.Hostname}
125 if from != "" {
126 params = append(params, from)
127 }
[60]128 uc.SendMessage(&irc.Message{
[13]129 Command: "PONG",
[66]130 Params: params,
[60]131 })
[33]132 return nil
[17]133 case "MODE":
[43]134 var name, modeStr string
135 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
136 return err
[17]137 }
[35]138
139 if name == msg.Prefix.Name { // user mode change
[55]140 if name != uc.nick {
[35]141 return fmt.Errorf("received MODE message for unknow nick %q", name)
142 }
[55]143 return uc.modes.Apply(modeStr)
[35]144 } else { // channel mode change
[55]145 ch, err := uc.getChannel(name)
[35]146 if err != nil {
147 return err
148 }
149 if err := ch.modes.Apply(modeStr); err != nil {
150 return err
151 }
[46]152 }
[35]153
[55]154 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]155 dc.SendMessage(msg)
[46]156 })
[18]157 case "NOTICE":
[55]158 uc.logger.Print(msg)
[14]159 case irc.RPL_WELCOME:
[55]160 uc.registered = true
161 uc.logger.Printf("connection registered")
[19]162
[55]163 for _, ch := range uc.upstream.Channels {
[60]164 uc.SendMessage(&irc.Message{
[19]165 Command: "JOIN",
166 Params: []string{ch},
[60]167 })
[19]168 }
[16]169 case irc.RPL_MYINFO:
[55]170 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
[43]171 return err
[16]172 }
173 if len(msg.Params) > 5 {
[55]174 uc.channelModesWithParam = msg.Params[5]
[16]175 }
[42]176 case "NICK":
[43]177 var newNick string
178 if err := parseMessageParams(msg, &newNick); err != nil {
179 return err
[42]180 }
181
[55]182 if msg.Prefix.Name == uc.nick {
183 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
184 uc.nick = newNick
[42]185 }
186
[55]187 for _, ch := range uc.channels {
[42]188 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
189 delete(ch.Members, msg.Prefix.Name)
190 ch.Members[newNick] = membership
191 }
192 }
193
[55]194 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]195 dc.SendMessage(msg)
[42]196 })
[19]197 case "JOIN":
[43]198 var channels string
199 if err := parseMessageParams(msg, &channels); err != nil {
200 return err
[19]201 }
[34]202
[43]203 for _, ch := range strings.Split(channels, ",") {
[55]204 if msg.Prefix.Name == uc.nick {
205 uc.logger.Printf("joined channel %q", ch)
206 uc.channels[ch] = &upstreamChannel{
[34]207 Name: ch,
[55]208 conn: uc,
[34]209 Members: make(map[string]membership),
210 }
211 } else {
[55]212 ch, err := uc.getChannel(ch)
[34]213 if err != nil {
214 return err
215 }
216 ch.Members[msg.Prefix.Name] = 0
[19]217 }
218 }
[34]219
[55]220 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]221 dc.SendMessage(msg)
[40]222 })
[34]223 case "PART":
[43]224 var channels string
225 if err := parseMessageParams(msg, &channels); err != nil {
226 return err
[34]227 }
228
[43]229 for _, ch := range strings.Split(channels, ",") {
[55]230 if msg.Prefix.Name == uc.nick {
231 uc.logger.Printf("parted channel %q", ch)
232 delete(uc.channels, ch)
[34]233 } else {
[55]234 ch, err := uc.getChannel(ch)
[34]235 if err != nil {
236 return err
237 }
238 delete(ch.Members, msg.Prefix.Name)
239 }
240 }
241
[55]242 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]243 dc.SendMessage(msg)
[40]244 })
[19]245 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]246 var name, topic string
247 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
248 return err
[19]249 }
[55]250 ch, err := uc.getChannel(name)
[19]251 if err != nil {
252 return err
253 }
254 if msg.Command == irc.RPL_TOPIC {
[43]255 ch.Topic = topic
[19]256 } else {
257 ch.Topic = ""
258 }
259 case "TOPIC":
[43]260 var name string
261 if err := parseMessageParams(msg, nil, &name); err != nil {
262 return err
[19]263 }
[55]264 ch, err := uc.getChannel(name)
[19]265 if err != nil {
266 return err
267 }
268 if len(msg.Params) > 1 {
269 ch.Topic = msg.Params[1]
270 } else {
271 ch.Topic = ""
272 }
273 case rpl_topicwhotime:
[43]274 var name, who, timeStr string
275 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
276 return err
[19]277 }
[55]278 ch, err := uc.getChannel(name)
[19]279 if err != nil {
280 return err
281 }
[43]282 ch.TopicWho = who
283 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]284 if err != nil {
285 return fmt.Errorf("failed to parse topic time: %v", err)
286 }
287 ch.TopicTime = time.Unix(sec, 0)
288 case irc.RPL_NAMREPLY:
[43]289 var name, statusStr, members string
290 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
291 return err
[19]292 }
[55]293 ch, err := uc.getChannel(name)
[19]294 if err != nil {
295 return err
296 }
297
[43]298 status, err := parseChannelStatus(statusStr)
[19]299 if err != nil {
300 return err
301 }
302 ch.Status = status
303
[43]304 for _, s := range strings.Split(members, " ") {
[19]305 membership, nick := parseMembershipPrefix(s)
306 ch.Members[nick] = membership
307 }
308 case irc.RPL_ENDOFNAMES:
[43]309 var name string
310 if err := parseMessageParams(msg, nil, &name); err != nil {
311 return err
[25]312 }
[55]313 ch, err := uc.getChannel(name)
[25]314 if err != nil {
315 return err
316 }
317
[34]318 if ch.complete {
319 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
320 }
[25]321 ch.complete = true
[27]322
[55]323 uc.user.forEachDownstream(func(dc *downstreamConn) {
[27]324 forwardChannel(dc, ch)
[40]325 })
[36]326 case "PRIVMSG":
[55]327 uc.ring.Produce(msg)
[16]328 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]329 // Ignore
330 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
331 // Ignore
332 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
333 // Ignore
334 case rpl_localusers, rpl_globalusers:
335 // Ignore
336 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
337 // Ignore
[13]338 default:
[55]339 uc.logger.Printf("unhandled upstream message: %v", msg)
[13]340 }
[14]341 return nil
[13]342}
343
[55]344func (uc *upstreamConn) register() {
345 uc.nick = uc.upstream.Nick
[60]346 uc.SendMessage(&irc.Message{
[13]347 Command: "NICK",
[55]348 Params: []string{uc.upstream.Nick},
[60]349 })
350 uc.SendMessage(&irc.Message{
[13]351 Command: "USER",
[55]352 Params: []string{uc.upstream.Username, "0", "*", uc.upstream.Realname},
[60]353 })
[44]354}
[13]355
[55]356func (uc *upstreamConn) readMessages() error {
[13]357 for {
[55]358 msg, err := uc.irc.ReadMessage()
[13]359 if err == io.EOF {
360 break
361 } else if err != nil {
362 return fmt.Errorf("failed to read IRC command: %v", err)
363 }
364
[64]365 if uc.srv.Debug {
366 uc.logger.Printf("received: %v", msg)
367 }
368
[55]369 if err := uc.handleMessage(msg); err != nil {
370 uc.logger.Printf("failed to handle message %q: %v", msg, err)
[13]371 }
372 }
373
[45]374 return nil
[13]375}
[60]376
377func (uc *upstreamConn) SendMessage(msg *irc.Message) {
378 uc.messages <- msg
379}
Note: See TracBrowser for help on using the repository browser.