source: code/trunk/upstream.go@ 55

Last change on this file since 55 was 55, checked in by contact, 5 years ago

Abbreviate {upstream,downstream}Conn with uc and dc

This makes it clearer than just c when we manipulate both kinds at the
same time.

File size: 7.9 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
[46]17 conn *upstreamConn
[19]18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
[35]22 modes modeSet
[19]23 Members map[string]membership
[25]24 complete bool
[19]25}
26
[13]27type upstreamConn struct {
[19]28 upstream *Upstream
[21]29 logger Logger
[19]30 net net.Conn
31 irc *irc.Conn
32 srv *Server
[37]33 user *user
[33]34 messages chan<- *irc.Message
[50]35 ring *Ring
[16]36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
[19]41
42 registered bool
[42]43 nick string
[33]44 closed bool
[19]45 modes modeSet
46 channels map[string]*upstreamChannel
[13]47}
48
[37]49func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
50 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
[33]51 logger.Printf("connecting to server")
52
53 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
54 if err != nil {
55 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
56 }
57
58 msgs := make(chan *irc.Message, 64)
[55]59 uc := &upstreamConn{
[33]60 upstream: upstream,
61 logger: logger,
62 net: netConn,
63 irc: irc.NewConn(netConn),
[37]64 srv: u.srv,
65 user: u,
[33]66 messages: msgs,
[50]67 ring: NewRing(u.srv.RingCap),
[33]68 channels: make(map[string]*upstreamChannel),
69 }
70
71 go func() {
72 for msg := range msgs {
[55]73 if err := uc.irc.WriteMessage(msg); err != nil {
74 uc.logger.Printf("failed to write message: %v", err)
[33]75 }
76 }
[55]77 if err := uc.net.Close(); err != nil {
78 uc.logger.Printf("failed to close connection: %v", err)
[45]79 } else {
[55]80 uc.logger.Printf("connection closed")
[45]81 }
[33]82 }()
83
[55]84 return uc, nil
[33]85}
86
[55]87func (uc *upstreamConn) Close() error {
88 if uc.closed {
[33]89 return fmt.Errorf("upstream connection already closed")
90 }
[55]91 close(uc.messages)
92 uc.closed = true
[33]93 return nil
94}
95
[55]96func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
97 ch, ok := uc.channels[name]
[19]98 if !ok {
99 return nil, fmt.Errorf("unknown channel %q", name)
100 }
101 return ch, nil
102}
103
[55]104func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[13]105 switch msg.Command {
106 case "PING":
107 // TODO: handle params
[55]108 uc.messages <- &irc.Message{
[13]109 Command: "PONG",
[55]110 Params: []string{uc.srv.Hostname},
[33]111 }
112 return nil
[17]113 case "MODE":
[43]114 var name, modeStr string
115 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
116 return err
[17]117 }
[35]118
119 if name == msg.Prefix.Name { // user mode change
[55]120 if name != uc.nick {
[35]121 return fmt.Errorf("received MODE message for unknow nick %q", name)
122 }
[55]123 return uc.modes.Apply(modeStr)
[35]124 } else { // channel mode change
[55]125 ch, err := uc.getChannel(name)
[35]126 if err != nil {
127 return err
128 }
129 if err := ch.modes.Apply(modeStr); err != nil {
130 return err
131 }
[46]132 }
[35]133
[55]134 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]135 dc.SendMessage(msg)
[46]136 })
[18]137 case "NOTICE":
[55]138 uc.logger.Print(msg)
[14]139 case irc.RPL_WELCOME:
[55]140 uc.registered = true
141 uc.logger.Printf("connection registered")
[19]142
[55]143 for _, ch := range uc.upstream.Channels {
144 uc.messages <- &irc.Message{
[19]145 Command: "JOIN",
146 Params: []string{ch},
147 }
148 }
[16]149 case irc.RPL_MYINFO:
[55]150 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
[43]151 return err
[16]152 }
153 if len(msg.Params) > 5 {
[55]154 uc.channelModesWithParam = msg.Params[5]
[16]155 }
[42]156 case "NICK":
[43]157 var newNick string
158 if err := parseMessageParams(msg, &newNick); err != nil {
159 return err
[42]160 }
161
[55]162 if msg.Prefix.Name == uc.nick {
163 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
164 uc.nick = newNick
[42]165 }
166
[55]167 for _, ch := range uc.channels {
[42]168 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
169 delete(ch.Members, msg.Prefix.Name)
170 ch.Members[newNick] = membership
171 }
172 }
173
[55]174 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]175 dc.SendMessage(msg)
[42]176 })
[19]177 case "JOIN":
[43]178 var channels string
179 if err := parseMessageParams(msg, &channels); err != nil {
180 return err
[19]181 }
[34]182
[43]183 for _, ch := range strings.Split(channels, ",") {
[55]184 if msg.Prefix.Name == uc.nick {
185 uc.logger.Printf("joined channel %q", ch)
186 uc.channels[ch] = &upstreamChannel{
[34]187 Name: ch,
[55]188 conn: uc,
[34]189 Members: make(map[string]membership),
190 }
191 } else {
[55]192 ch, err := uc.getChannel(ch)
[34]193 if err != nil {
194 return err
195 }
196 ch.Members[msg.Prefix.Name] = 0
[19]197 }
198 }
[34]199
[55]200 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]201 dc.SendMessage(msg)
[40]202 })
[34]203 case "PART":
[43]204 var channels string
205 if err := parseMessageParams(msg, &channels); err != nil {
206 return err
[34]207 }
208
[43]209 for _, ch := range strings.Split(channels, ",") {
[55]210 if msg.Prefix.Name == uc.nick {
211 uc.logger.Printf("parted channel %q", ch)
212 delete(uc.channels, ch)
[34]213 } else {
[55]214 ch, err := uc.getChannel(ch)
[34]215 if err != nil {
216 return err
217 }
218 delete(ch.Members, msg.Prefix.Name)
219 }
220 }
221
[55]222 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]223 dc.SendMessage(msg)
[40]224 })
[19]225 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]226 var name, topic string
227 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
228 return err
[19]229 }
[55]230 ch, err := uc.getChannel(name)
[19]231 if err != nil {
232 return err
233 }
234 if msg.Command == irc.RPL_TOPIC {
[43]235 ch.Topic = topic
[19]236 } else {
237 ch.Topic = ""
238 }
239 case "TOPIC":
[43]240 var name string
241 if err := parseMessageParams(msg, nil, &name); err != nil {
242 return err
[19]243 }
[55]244 ch, err := uc.getChannel(name)
[19]245 if err != nil {
246 return err
247 }
248 if len(msg.Params) > 1 {
249 ch.Topic = msg.Params[1]
250 } else {
251 ch.Topic = ""
252 }
253 case rpl_topicwhotime:
[43]254 var name, who, timeStr string
255 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
256 return err
[19]257 }
[55]258 ch, err := uc.getChannel(name)
[19]259 if err != nil {
260 return err
261 }
[43]262 ch.TopicWho = who
263 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]264 if err != nil {
265 return fmt.Errorf("failed to parse topic time: %v", err)
266 }
267 ch.TopicTime = time.Unix(sec, 0)
268 case irc.RPL_NAMREPLY:
[43]269 var name, statusStr, members string
270 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
271 return err
[19]272 }
[55]273 ch, err := uc.getChannel(name)
[19]274 if err != nil {
275 return err
276 }
277
[43]278 status, err := parseChannelStatus(statusStr)
[19]279 if err != nil {
280 return err
281 }
282 ch.Status = status
283
[43]284 for _, s := range strings.Split(members, " ") {
[19]285 membership, nick := parseMembershipPrefix(s)
286 ch.Members[nick] = membership
287 }
288 case irc.RPL_ENDOFNAMES:
[43]289 var name string
290 if err := parseMessageParams(msg, nil, &name); err != nil {
291 return err
[25]292 }
[55]293 ch, err := uc.getChannel(name)
[25]294 if err != nil {
295 return err
296 }
297
[34]298 if ch.complete {
299 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
300 }
[25]301 ch.complete = true
[27]302
[55]303 uc.user.forEachDownstream(func(dc *downstreamConn) {
[27]304 forwardChannel(dc, ch)
[40]305 })
[36]306 case "PRIVMSG":
[55]307 uc.ring.Produce(msg)
308 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]309 dc.SendMessage(msg)
[40]310 })
[16]311 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]312 // Ignore
313 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
314 // Ignore
315 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
316 // Ignore
317 case rpl_localusers, rpl_globalusers:
318 // Ignore
319 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
320 // Ignore
[13]321 default:
[55]322 uc.logger.Printf("unhandled upstream message: %v", msg)
[13]323 }
[14]324 return nil
[13]325}
326
[55]327func (uc *upstreamConn) register() {
328 uc.nick = uc.upstream.Nick
329 uc.messages <- &irc.Message{
[13]330 Command: "NICK",
[55]331 Params: []string{uc.upstream.Nick},
[13]332 }
[55]333 uc.messages <- &irc.Message{
[13]334 Command: "USER",
[55]335 Params: []string{uc.upstream.Username, "0", "*", uc.upstream.Realname},
[13]336 }
[44]337}
[13]338
[55]339func (uc *upstreamConn) readMessages() error {
[13]340 for {
[55]341 msg, err := uc.irc.ReadMessage()
[13]342 if err == io.EOF {
343 break
344 } else if err != nil {
345 return fmt.Errorf("failed to read IRC command: %v", err)
346 }
347
[55]348 if err := uc.handleMessage(msg); err != nil {
349 uc.logger.Printf("failed to handle message %q: %v", msg, err)
[13]350 }
351 }
352
[45]353 return nil
[13]354}
Note: See TracBrowser for help on using the repository browser.