source: code/trunk/upstream.go@ 59

Last change on this file since 59 was 57, checked in by contact, 5 years ago

Fix issues related to Ring

  • RingConsumer is now used directly in the goroutine responsible for writing downstream messages. This allows the ring buffer not to be consumed on write error.
  • RingConsumer now has a channel attached. This allows PRIVMSG messages to always use RingConsumer, instead of also directly pushing messages to all downstream connections.
  • Multiple clients with the same history name are now supported.
  • Ring is now protected by a mutex
File size: 7.9 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 conn *upstreamConn
18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
22 modes modeSet
23 Members map[string]membership
24 complete bool
25}
26
27type upstreamConn struct {
28 upstream *Upstream
29 logger Logger
30 net net.Conn
31 irc *irc.Conn
32 srv *Server
33 user *user
34 messages chan<- *irc.Message
35 ring *Ring
36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
41
42 registered bool
43 nick string
44 closed bool
45 modes modeSet
46 channels map[string]*upstreamChannel
47 history map[string]uint64
48}
49
50func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
51 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
52 logger.Printf("connecting to server")
53
54 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
55 if err != nil {
56 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
57 }
58
59 msgs := make(chan *irc.Message, 64)
60 uc := &upstreamConn{
61 upstream: upstream,
62 logger: logger,
63 net: netConn,
64 irc: irc.NewConn(netConn),
65 srv: u.srv,
66 user: u,
67 messages: msgs,
68 ring: NewRing(u.srv.RingCap),
69 channels: make(map[string]*upstreamChannel),
70 history: make(map[string]uint64),
71 }
72
73 go func() {
74 for msg := range msgs {
75 if err := uc.irc.WriteMessage(msg); err != nil {
76 uc.logger.Printf("failed to write message: %v", err)
77 }
78 }
79 if err := uc.net.Close(); err != nil {
80 uc.logger.Printf("failed to close connection: %v", err)
81 } else {
82 uc.logger.Printf("connection closed")
83 }
84 }()
85
86 return uc, nil
87}
88
89func (uc *upstreamConn) Close() error {
90 if uc.closed {
91 return fmt.Errorf("upstream connection already closed")
92 }
93 close(uc.messages)
94 uc.closed = true
95 return nil
96}
97
98func (uc *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
99 ch, ok := uc.channels[name]
100 if !ok {
101 return nil, fmt.Errorf("unknown channel %q", name)
102 }
103 return ch, nil
104}
105
106func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
107 switch msg.Command {
108 case "PING":
109 // TODO: handle params
110 uc.messages <- &irc.Message{
111 Command: "PONG",
112 Params: []string{uc.srv.Hostname},
113 }
114 return nil
115 case "MODE":
116 var name, modeStr string
117 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
118 return err
119 }
120
121 if name == msg.Prefix.Name { // user mode change
122 if name != uc.nick {
123 return fmt.Errorf("received MODE message for unknow nick %q", name)
124 }
125 return uc.modes.Apply(modeStr)
126 } else { // channel mode change
127 ch, err := uc.getChannel(name)
128 if err != nil {
129 return err
130 }
131 if err := ch.modes.Apply(modeStr); err != nil {
132 return err
133 }
134 }
135
136 uc.user.forEachDownstream(func(dc *downstreamConn) {
137 dc.SendMessage(msg)
138 })
139 case "NOTICE":
140 uc.logger.Print(msg)
141 case irc.RPL_WELCOME:
142 uc.registered = true
143 uc.logger.Printf("connection registered")
144
145 for _, ch := range uc.upstream.Channels {
146 uc.messages <- &irc.Message{
147 Command: "JOIN",
148 Params: []string{ch},
149 }
150 }
151 case irc.RPL_MYINFO:
152 if err := parseMessageParams(msg, nil, &uc.serverName, nil, &uc.availableUserModes, &uc.availableChannelModes); err != nil {
153 return err
154 }
155 if len(msg.Params) > 5 {
156 uc.channelModesWithParam = msg.Params[5]
157 }
158 case "NICK":
159 var newNick string
160 if err := parseMessageParams(msg, &newNick); err != nil {
161 return err
162 }
163
164 if msg.Prefix.Name == uc.nick {
165 uc.logger.Printf("changed nick from %q to %q", uc.nick, newNick)
166 uc.nick = newNick
167 }
168
169 for _, ch := range uc.channels {
170 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
171 delete(ch.Members, msg.Prefix.Name)
172 ch.Members[newNick] = membership
173 }
174 }
175
176 uc.user.forEachDownstream(func(dc *downstreamConn) {
177 dc.SendMessage(msg)
178 })
179 case "JOIN":
180 var channels string
181 if err := parseMessageParams(msg, &channels); err != nil {
182 return err
183 }
184
185 for _, ch := range strings.Split(channels, ",") {
186 if msg.Prefix.Name == uc.nick {
187 uc.logger.Printf("joined channel %q", ch)
188 uc.channels[ch] = &upstreamChannel{
189 Name: ch,
190 conn: uc,
191 Members: make(map[string]membership),
192 }
193 } else {
194 ch, err := uc.getChannel(ch)
195 if err != nil {
196 return err
197 }
198 ch.Members[msg.Prefix.Name] = 0
199 }
200 }
201
202 uc.user.forEachDownstream(func(dc *downstreamConn) {
203 dc.SendMessage(msg)
204 })
205 case "PART":
206 var channels string
207 if err := parseMessageParams(msg, &channels); err != nil {
208 return err
209 }
210
211 for _, ch := range strings.Split(channels, ",") {
212 if msg.Prefix.Name == uc.nick {
213 uc.logger.Printf("parted channel %q", ch)
214 delete(uc.channels, ch)
215 } else {
216 ch, err := uc.getChannel(ch)
217 if err != nil {
218 return err
219 }
220 delete(ch.Members, msg.Prefix.Name)
221 }
222 }
223
224 uc.user.forEachDownstream(func(dc *downstreamConn) {
225 dc.SendMessage(msg)
226 })
227 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
228 var name, topic string
229 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
230 return err
231 }
232 ch, err := uc.getChannel(name)
233 if err != nil {
234 return err
235 }
236 if msg.Command == irc.RPL_TOPIC {
237 ch.Topic = topic
238 } else {
239 ch.Topic = ""
240 }
241 case "TOPIC":
242 var name string
243 if err := parseMessageParams(msg, nil, &name); err != nil {
244 return err
245 }
246 ch, err := uc.getChannel(name)
247 if err != nil {
248 return err
249 }
250 if len(msg.Params) > 1 {
251 ch.Topic = msg.Params[1]
252 } else {
253 ch.Topic = ""
254 }
255 case rpl_topicwhotime:
256 var name, who, timeStr string
257 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
258 return err
259 }
260 ch, err := uc.getChannel(name)
261 if err != nil {
262 return err
263 }
264 ch.TopicWho = who
265 sec, err := strconv.ParseInt(timeStr, 10, 64)
266 if err != nil {
267 return fmt.Errorf("failed to parse topic time: %v", err)
268 }
269 ch.TopicTime = time.Unix(sec, 0)
270 case irc.RPL_NAMREPLY:
271 var name, statusStr, members string
272 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
273 return err
274 }
275 ch, err := uc.getChannel(name)
276 if err != nil {
277 return err
278 }
279
280 status, err := parseChannelStatus(statusStr)
281 if err != nil {
282 return err
283 }
284 ch.Status = status
285
286 for _, s := range strings.Split(members, " ") {
287 membership, nick := parseMembershipPrefix(s)
288 ch.Members[nick] = membership
289 }
290 case irc.RPL_ENDOFNAMES:
291 var name string
292 if err := parseMessageParams(msg, nil, &name); err != nil {
293 return err
294 }
295 ch, err := uc.getChannel(name)
296 if err != nil {
297 return err
298 }
299
300 if ch.complete {
301 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
302 }
303 ch.complete = true
304
305 uc.user.forEachDownstream(func(dc *downstreamConn) {
306 forwardChannel(dc, ch)
307 })
308 case "PRIVMSG":
309 uc.ring.Produce(msg)
310 case irc.RPL_YOURHOST, irc.RPL_CREATED:
311 // Ignore
312 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
313 // Ignore
314 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
315 // Ignore
316 case rpl_localusers, rpl_globalusers:
317 // Ignore
318 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
319 // Ignore
320 default:
321 uc.logger.Printf("unhandled upstream message: %v", msg)
322 }
323 return nil
324}
325
326func (uc *upstreamConn) register() {
327 uc.nick = uc.upstream.Nick
328 uc.messages <- &irc.Message{
329 Command: "NICK",
330 Params: []string{uc.upstream.Nick},
331 }
332 uc.messages <- &irc.Message{
333 Command: "USER",
334 Params: []string{uc.upstream.Username, "0", "*", uc.upstream.Realname},
335 }
336}
337
338func (uc *upstreamConn) readMessages() error {
339 for {
340 msg, err := uc.irc.ReadMessage()
341 if err == io.EOF {
342 break
343 } else if err != nil {
344 return fmt.Errorf("failed to read IRC command: %v", err)
345 }
346
347 if err := uc.handleMessage(msg); err != nil {
348 uc.logger.Printf("failed to handle message %q: %v", msg, err)
349 }
350 }
351
352 return nil
353}
Note: See TracBrowser for help on using the repository browser.