source: code/trunk/upstream.go@ 69

Last change on this file since 69 was 69, checked in by contact, 5 years ago

Add functions to translate between upstream and downstream names

File size: 8.8 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
[46]17 conn *upstreamConn
[19]18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
[35]22 modes modeSet
[19]23 Members map[string]membership
[25]24 complete bool
[19]25}
26
[13]27type upstreamConn struct {
[19]28 upstream *Upstream
[21]29 logger Logger
[19]30 net net.Conn
31 irc *irc.Conn
32 srv *Server
[37]33 user *user
[33]34 messages chan<- *irc.Message
[50]35 ring *Ring
[16]36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
[19]41
42 registered bool
[42]43 nick string
[33]44 closed bool
[19]45 modes modeSet
46 channels map[string]*upstreamChannel
[57]47 history map[string]uint64
[13]48}
49
[37]50func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
51 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
[33]52 logger.Printf("connecting to server")
53
54 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
55 if err != nil {
56 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
57 }
58
[67]59 setKeepAlive(netConn)
60
[33]61 msgs := make(chan *irc.Message, 64)
[55]62 uc := &upstreamConn{
[33]63 upstream: upstream,
64 logger: logger,
65 net: netConn,
66 irc: irc.NewConn(netConn),
[37]67 srv: u.srv,
68 user: u,
[33]69 messages: msgs,
[50]70 ring: NewRing(u.srv.RingCap),
[33]71 channels: make(map[string]*upstreamChannel),
[57]72 history: make(map[string]uint64),
[33]73 }
74
75 go func() {
76 for msg := range msgs {
[64]77 if uc.srv.Debug {
78 uc.logger.Printf("sent: %v", msg)
79 }
[55]80 if err := uc.irc.WriteMessage(msg); err != nil {
81 uc.logger.Printf("failed to write message: %v", err)
[33]82 }
83 }
[55]84 if err := uc.net.Close(); err != nil {
85 uc.logger.Printf("failed to close connection: %v", err)
[45]86 } else {
[55]87 uc.logger.Printf("connection closed")
[45]88 }
[33]89 }()
90
[55]91 return uc, nil
[33]92}
93
[69]94func (uc *upstreamConn) prefix() *irc.Prefix {
95 return &irc.Prefix{
96 Name: uc.nick,
97 User: uc.upstream.Username,
98 // TODO: fill the host?
99 }
100}
101
[55]102func (uc *upstreamConn) Close() error {
103 if uc.closed {
[33]104 return fmt.Errorf("upstream connection already closed")
105 }
[55]106 close(uc.messages)
107 uc.closed = true
[33]108 return nil
109}
110
[55]111func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
112 ch, ok := uc.channels[name]
[19]113 if !ok {
114 return nil, fmt.Errorf("unknown channel %q", name)
115 }
116 return ch, nil
117}
118
[55]119func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[13]120 switch msg.Command {
121 case "PING":
[60]122 uc.SendMessage(&irc.Message{
[13]123 Command: "PONG",
[68]124 Params: msg.Params,
[60]125 })
[33]126 return nil
[17]127 case "MODE":
[69]128 if msg.Prefix == nil {
129 return fmt.Errorf("missing prefix")
130 }
131
[43]132 var name, modeStr string
133 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
134 return err
[17]135 }
[35]136
137 if name == msg.Prefix.Name { // user mode change
[55]138 if name != uc.nick {
[35]139 return fmt.Errorf("received MODE message for unknow nick %q", name)
140 }
[55]141 return uc.modes.Apply(modeStr)
[35]142 } else { // channel mode change
[55]143 ch, err := uc.getChannel(name)
[35]144 if err != nil {
145 return err
146 }
147 if err := ch.modes.Apply(modeStr); err != nil {
148 return err
149 }
[69]150
151 uc.user.forEachDownstream(func(dc *downstreamConn) {
152 dc.SendMessage(&irc.Message{
153 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
154 Command: "MODE",
155 Params: []string{dc.marshalChannel(uc, name), modeStr},
156 })
157 })
[46]158 }
[18]159 case "NOTICE":
[55]160 uc.logger.Print(msg)
[14]161 case irc.RPL_WELCOME:
[55]162 uc.registered = true
163 uc.logger.Printf("connection registered")
[19]164
[55]165 for _, ch := range uc.upstream.Channels {
[60]166 uc.SendMessage(&irc.Message{
[19]167 Command: "JOIN",
168 Params: []string{ch},
[60]169 })
[19]170 }
[16]171 case irc.RPL_MYINFO:
[55]172 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
[43]173 return err
[16]174 }
175 if len(msg.Params) > 5 {
[55]176 uc.channelModesWithParam = msg.Params[5]
[16]177 }
[42]178 case "NICK":
[43]179 var newNick string
180 if err := parseMessageParams(msg, &newNick); err != nil {
181 return err
[42]182 }
183
[55]184 if msg.Prefix.Name == uc.nick {
185 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
186 uc.nick = newNick
[42]187 }
188
[55]189 for _, ch := range uc.channels {
[42]190 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
191 delete(ch.Members, msg.Prefix.Name)
192 ch.Members[newNick] = membership
193 }
194 }
[69]195 case "JOIN":
196 if msg.Prefix == nil {
197 return fmt.Errorf("expected a prefix")
198 }
[42]199
[43]200 var channels string
201 if err := parseMessageParams(msg, &channels); err != nil {
202 return err
[19]203 }
[34]204
[43]205 for _, ch := range strings.Split(channels, ",") {
[55]206 if msg.Prefix.Name == uc.nick {
207 uc.logger.Printf("joined channel %q", ch)
208 uc.channels[ch] = &upstreamChannel{
[34]209 Name: ch,
[55]210 conn: uc,
[34]211 Members: make(map[string]membership),
212 }
213 } else {
[55]214 ch, err := uc.getChannel(ch)
[34]215 if err != nil {
216 return err
217 }
218 ch.Members[msg.Prefix.Name] = 0
[19]219 }
[69]220
221 uc.user.forEachDownstream(func(dc *downstreamConn) {
222 dc.SendMessage(&irc.Message{
223 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
224 Command: "JOIN",
225 Params: []string{dc.marshalChannel(uc, ch)},
226 })
227 })
[19]228 }
[69]229 case "PART":
230 if msg.Prefix == nil {
231 return fmt.Errorf("expected a prefix")
232 }
[34]233
[43]234 var channels string
235 if err := parseMessageParams(msg, &channels); err != nil {
236 return err
[34]237 }
238
[43]239 for _, ch := range strings.Split(channels, ",") {
[55]240 if msg.Prefix.Name == uc.nick {
241 uc.logger.Printf("parted channel %q", ch)
242 delete(uc.channels, ch)
[34]243 } else {
[55]244 ch, err := uc.getChannel(ch)
[34]245 if err != nil {
246 return err
247 }
248 delete(ch.Members, msg.Prefix.Name)
249 }
[69]250
251 uc.user.forEachDownstream(func(dc *downstreamConn) {
252 dc.SendMessage(&irc.Message{
253 Prefix: dc.marshalUserPrefix(uc, msg.Prefix),
254 Command: "PART",
255 Params: []string{dc.marshalChannel(uc, ch)},
256 })
257 })
[34]258 }
[19]259 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]260 var name, topic string
261 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
262 return err
[19]263 }
[55]264 ch, err := uc.getChannel(name)
[19]265 if err != nil {
266 return err
267 }
268 if msg.Command == irc.RPL_TOPIC {
[43]269 ch.Topic = topic
[19]270 } else {
271 ch.Topic = ""
272 }
273 case "TOPIC":
[43]274 var name string
275 if err := parseMessageParams(msg, nil, &name); err != nil {
276 return err
[19]277 }
[55]278 ch, err := uc.getChannel(name)
[19]279 if err != nil {
280 return err
281 }
282 if len(msg.Params) > 1 {
283 ch.Topic = msg.Params[1]
284 } else {
285 ch.Topic = ""
286 }
287 case rpl_topicwhotime:
[43]288 var name, who, timeStr string
289 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
290 return err
[19]291 }
[55]292 ch, err := uc.getChannel(name)
[19]293 if err != nil {
294 return err
295 }
[43]296 ch.TopicWho = who
297 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]298 if err != nil {
299 return fmt.Errorf("failed to parse topic time: %v", err)
300 }
301 ch.TopicTime = time.Unix(sec, 0)
302 case irc.RPL_NAMREPLY:
[43]303 var name, statusStr, members string
304 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
305 return err
[19]306 }
[55]307 ch, err := uc.getChannel(name)
[19]308 if err != nil {
309 return err
310 }
311
[43]312 status, err := parseChannelStatus(statusStr)
[19]313 if err != nil {
314 return err
315 }
316 ch.Status = status
317
[43]318 for _, s := range strings.Split(members, " ") {
[19]319 membership, nick := parseMembershipPrefix(s)
320 ch.Members[nick] = membership
321 }
322 case irc.RPL_ENDOFNAMES:
[43]323 var name string
324 if err := parseMessageParams(msg, nil, &name); err != nil {
325 return err
[25]326 }
[55]327 ch, err := uc.getChannel(name)
[25]328 if err != nil {
329 return err
330 }
331
[34]332 if ch.complete {
333 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
334 }
[25]335 ch.complete = true
[27]336
[55]337 uc.user.forEachDownstream(func(dc *downstreamConn) {
[27]338 forwardChannel(dc, ch)
[40]339 })
[36]340 case "PRIVMSG":
[69]341 if err := parseMessageParams(msg, nil, nil); err != nil {
342 return err
343 }
[55]344 uc.ring.Produce(msg)
[16]345 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]346 // Ignore
347 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
348 // Ignore
349 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
350 // Ignore
351 case rpl_localusers, rpl_globalusers:
352 // Ignore
353 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
354 // Ignore
[13]355 default:
[55]356 uc.logger.Printf("unhandled upstream message: %v", msg)
[13]357 }
[14]358 return nil
[13]359}
360
[55]361func (uc *upstreamConn) register() {
362 uc.nick = uc.upstream.Nick
[60]363 uc.SendMessage(&irc.Message{
[13]364 Command: "NICK",
[69]365 Params: []string{uc.nick},
[60]366 })
367 uc.SendMessage(&irc.Message{
[13]368 Command: "USER",
[55]369 Params: []string{uc.upstream.Username, "0", "*", uc.upstream.Realname},
[60]370 })
[44]371}
[13]372
[55]373func (uc *upstreamConn) readMessages() error {
[13]374 for {
[55]375 msg, err := uc.irc.ReadMessage()
[13]376 if err == io.EOF {
377 break
378 } else if err != nil {
379 return fmt.Errorf("failed to read IRC command: %v", err)
380 }
381
[64]382 if uc.srv.Debug {
383 uc.logger.Printf("received: %v", msg)
384 }
385
[55]386 if err := uc.handleMessage(msg); err != nil {
387 uc.logger.Printf("failed to handle message %q: %v", msg, err)
[13]388 }
389 }
390
[45]391 return nil
[13]392}
[60]393
394func (uc *upstreamConn) SendMessage(msg *irc.Message) {
395 uc.messages <- msg
396}
Note: See TracBrowser for help on using the repository browser.