source: code/trunk/upstream.go@ 57

Last change on this file since 57 was 57, checked in by contact, 5 years ago

Fix issues related to Ring

  • RingConsumer is now used directly in the goroutine responsible for writing downstream messages. This allows the ring buffer not to be consumed on write error.
  • RingConsumer now has a channel attached. This allows PRIVMSG messages to always use RingConsumer, instead of also directly pushing messages to all downstream connections.
  • Multiple clients with the same history name are now supported.
  • Ring is now protected by a mutex
File size: 7.9 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
[46]17 conn *upstreamConn
[19]18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
[35]22 modes modeSet
[19]23 Members map[string]membership
[25]24 complete bool
[19]25}
26
[13]27type upstreamConn struct {
[19]28 upstream *Upstream
[21]29 logger Logger
[19]30 net net.Conn
31 irc *irc.Conn
32 srv *Server
[37]33 user *user
[33]34 messages chan<- *irc.Message
[50]35 ring *Ring
[16]36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
[19]41
42 registered bool
[42]43 nick string
[33]44 closed bool
[19]45 modes modeSet
46 channels map[string]*upstreamChannel
[57]47 history map[string]uint64
[13]48}
49
[37]50func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
51 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
[33]52 logger.Printf("connecting to server")
53
54 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
55 if err != nil {
56 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
57 }
58
59 msgs := make(chan *irc.Message, 64)
[55]60 uc := &upstreamConn{
[33]61 upstream: upstream,
62 logger: logger,
63 net: netConn,
64 irc: irc.NewConn(netConn),
[37]65 srv: u.srv,
66 user: u,
[33]67 messages: msgs,
[50]68 ring: NewRing(u.srv.RingCap),
[33]69 channels: make(map[string]*upstreamChannel),
[57]70 history: make(map[string]uint64),
[33]71 }
72
73 go func() {
74 for msg := range msgs {
[55]75 if err := uc.irc.WriteMessage(msg); err != nil {
76 uc.logger.Printf("failed to write message: %v", err)
[33]77 }
78 }
[55]79 if err := uc.net.Close(); err != nil {
80 uc.logger.Printf("failed to close connection: %v", err)
[45]81 } else {
[55]82 uc.logger.Printf("connection closed")
[45]83 }
[33]84 }()
85
[55]86 return uc, nil
[33]87}
88
[55]89func (uc *upstreamConn) Close() error {
90 if uc.closed {
[33]91 return fmt.Errorf("upstream connection already closed")
92 }
[55]93 close(uc.messages)
94 uc.closed = true
[33]95 return nil
96}
97
[55]98func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
99 ch, ok := uc.channels[name]
[19]100 if !ok {
101 return nil, fmt.Errorf("unknown channel %q", name)
102 }
103 return ch, nil
104}
105
[55]106func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
[13]107 switch msg.Command {
108 case "PING":
109 // TODO: handle params
[55]110 uc.messages <- &irc.Message{
[13]111 Command: "PONG",
[55]112 Params: []string{uc.srv.Hostname},
[33]113 }
114 return nil
[17]115 case "MODE":
[43]116 var name, modeStr string
117 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
118 return err
[17]119 }
[35]120
121 if name == msg.Prefix.Name { // user mode change
[55]122 if name != uc.nick {
[35]123 return fmt.Errorf("received MODE message for unknow nick %q", name)
124 }
[55]125 return uc.modes.Apply(modeStr)
[35]126 } else { // channel mode change
[55]127 ch, err := uc.getChannel(name)
[35]128 if err != nil {
129 return err
130 }
131 if err := ch.modes.Apply(modeStr); err != nil {
132 return err
133 }
[46]134 }
[35]135
[55]136 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]137 dc.SendMessage(msg)
[46]138 })
[18]139 case "NOTICE":
[55]140 uc.logger.Print(msg)
[14]141 case irc.RPL_WELCOME:
[55]142 uc.registered = true
143 uc.logger.Printf("connection registered")
[19]144
[55]145 for _, ch := range uc.upstream.Channels {
146 uc.messages <- &irc.Message{
[19]147 Command: "JOIN",
148 Params: []string{ch},
149 }
150 }
[16]151 case irc.RPL_MYINFO:
[55]152 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
[43]153 return err
[16]154 }
155 if len(msg.Params) > 5 {
[55]156 uc.channelModesWithParam = msg.Params[5]
[16]157 }
[42]158 case "NICK":
[43]159 var newNick string
160 if err := parseMessageParams(msg, &newNick); err != nil {
161 return err
[42]162 }
163
[55]164 if msg.Prefix.Name == uc.nick {
165 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
166 uc.nick = newNick
[42]167 }
168
[55]169 for _, ch := range uc.channels {
[42]170 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
171 delete(ch.Members, msg.Prefix.Name)
172 ch.Members[newNick] = membership
173 }
174 }
175
[55]176 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]177 dc.SendMessage(msg)
[42]178 })
[19]179 case "JOIN":
[43]180 var channels string
181 if err := parseMessageParams(msg, &channels); err != nil {
182 return err
[19]183 }
[34]184
[43]185 for _, ch := range strings.Split(channels, ",") {
[55]186 if msg.Prefix.Name == uc.nick {
187 uc.logger.Printf("joined channel %q", ch)
188 uc.channels[ch] = &upstreamChannel{
[34]189 Name: ch,
[55]190 conn: uc,
[34]191 Members: make(map[string]membership),
192 }
193 } else {
[55]194 ch, err := uc.getChannel(ch)
[34]195 if err != nil {
196 return err
197 }
198 ch.Members[msg.Prefix.Name] = 0
[19]199 }
200 }
[34]201
[55]202 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]203 dc.SendMessage(msg)
[40]204 })
[34]205 case "PART":
[43]206 var channels string
207 if err := parseMessageParams(msg, &channels); err != nil {
208 return err
[34]209 }
210
[43]211 for _, ch := range strings.Split(channels, ",") {
[55]212 if msg.Prefix.Name == uc.nick {
213 uc.logger.Printf("parted channel %q", ch)
214 delete(uc.channels, ch)
[34]215 } else {
[55]216 ch, err := uc.getChannel(ch)
[34]217 if err != nil {
218 return err
219 }
220 delete(ch.Members, msg.Prefix.Name)
221 }
222 }
223
[55]224 uc.user.forEachDownstream(func(dc *downstreamConn) {
[54]225 dc.SendMessage(msg)
[40]226 })
[19]227 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
[43]228 var name, topic string
229 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
230 return err
[19]231 }
[55]232 ch, err := uc.getChannel(name)
[19]233 if err != nil {
234 return err
235 }
236 if msg.Command == irc.RPL_TOPIC {
[43]237 ch.Topic = topic
[19]238 } else {
239 ch.Topic = ""
240 }
241 case "TOPIC":
[43]242 var name string
243 if err := parseMessageParams(msg, nil, &name); err != nil {
244 return err
[19]245 }
[55]246 ch, err := uc.getChannel(name)
[19]247 if err != nil {
248 return err
249 }
250 if len(msg.Params) > 1 {
251 ch.Topic = msg.Params[1]
252 } else {
253 ch.Topic = ""
254 }
255 case rpl_topicwhotime:
[43]256 var name, who, timeStr string
257 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
258 return err
[19]259 }
[55]260 ch, err := uc.getChannel(name)
[19]261 if err != nil {
262 return err
263 }
[43]264 ch.TopicWho = who
265 sec, err := strconv.ParseInt(timeStr, 10, 64)
[19]266 if err != nil {
267 return fmt.Errorf("failed to parse topic time: %v", err)
268 }
269 ch.TopicTime = time.Unix(sec, 0)
270 case irc.RPL_NAMREPLY:
[43]271 var name, statusStr, members string
272 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
273 return err
[19]274 }
[55]275 ch, err := uc.getChannel(name)
[19]276 if err != nil {
277 return err
278 }
279
[43]280 status, err := parseChannelStatus(statusStr)
[19]281 if err != nil {
282 return err
283 }
284 ch.Status = status
285
[43]286 for _, s := range strings.Split(members, " ") {
[19]287 membership, nick := parseMembershipPrefix(s)
288 ch.Members[nick] = membership
289 }
290 case irc.RPL_ENDOFNAMES:
[43]291 var name string
292 if err := parseMessageParams(msg, nil, &name); err != nil {
293 return err
[25]294 }
[55]295 ch, err := uc.getChannel(name)
[25]296 if err != nil {
297 return err
298 }
299
[34]300 if ch.complete {
301 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
302 }
[25]303 ch.complete = true
[27]304
[55]305 uc.user.forEachDownstream(func(dc *downstreamConn) {
[27]306 forwardChannel(dc, ch)
[40]307 })
[36]308 case "PRIVMSG":
[55]309 uc.ring.Produce(msg)
[16]310 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]311 // Ignore
312 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
313 // Ignore
314 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
315 // Ignore
316 case rpl_localusers, rpl_globalusers:
317 // Ignore
318 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
319 // Ignore
[13]320 default:
[55]321 uc.logger.Printf("unhandled upstream message: %v", msg)
[13]322 }
[14]323 return nil
[13]324}
325
[55]326func (uc *upstreamConn) register() {
327 uc.nick = uc.upstream.Nick
328 uc.messages <- &irc.Message{
[13]329 Command: "NICK",
[55]330 Params: []string{uc.upstream.Nick},
[13]331 }
[55]332 uc.messages <- &irc.Message{
[13]333 Command: "USER",
[55]334 Params: []string{uc.upstream.Username, "0", "*", uc.upstream.Realname},
[13]335 }
[44]336}
[13]337
[55]338func (uc *upstreamConn) readMessages() error {
[13]339 for {
[55]340 msg, err := uc.irc.ReadMessage()
[13]341 if err == io.EOF {
342 break
343 } else if err != nil {
344 return fmt.Errorf("failed to read IRC command: %v", err)
345 }
346
[55]347 if err := uc.handleMessage(msg); err != nil {
348 uc.logger.Printf("failed to handle message %q: %v", msg, err)
[13]349 }
350 }
351
[45]352 return nil
[13]353}
Note: See TracBrowser for help on using the repository browser.