source: code/trunk/upstream.go@ 53

Last change on this file since 53 was 50, checked in by contact, 5 years ago

Add an in-memory ring buffer

References: https://todo.sr.ht/~emersion/jounce/2

File size: 7.8 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 conn *upstreamConn
18 Topic string
19 TopicWho string
20 TopicTime time.Time
21 Status channelStatus
22 modes modeSet
23 Members map[string]membership
24 complete bool
25}
26
27type upstreamConn struct {
28 upstream *Upstream
29 logger Logger
30 net net.Conn
31 irc *irc.Conn
32 srv *Server
33 user *user
34 messages chan<- *irc.Message
35 ring *Ring
36
37 serverName string
38 availableUserModes string
39 availableChannelModes string
40 channelModesWithParam string
41
42 registered bool
43 nick string
44 closed bool
45 modes modeSet
46 channels map[string]*upstreamChannel
47}
48
49func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
50 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
51 logger.Printf("connecting to server")
52
53 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
54 if err != nil {
55 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
56 }
57
58 msgs := make(chan *irc.Message, 64)
59 conn := &upstreamConn{
60 upstream: upstream,
61 logger: logger,
62 net: netConn,
63 irc: irc.NewConn(netConn),
64 srv: u.srv,
65 user: u,
66 messages: msgs,
67 ring: NewRing(u.srv.RingCap),
68 channels: make(map[string]*upstreamChannel),
69 }
70
71 go func() {
72 for msg := range msgs {
73 if err := conn.irc.WriteMessage(msg); err != nil {
74 conn.logger.Printf("failed to write message: %v", err)
75 }
76 }
77 if err := conn.net.Close(); err != nil {
78 conn.logger.Printf("failed to close connection: %v", err)
79 } else {
80 conn.logger.Printf("connection closed")
81 }
82 }()
83
84 return conn, nil
85}
86
87func (c *upstreamConn) Close() error {
88 if c.closed {
89 return fmt.Errorf("upstream connection already closed")
90 }
91 close(c.messages)
92 c.closed = true
93 return nil
94}
95
96func (c *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
97 ch, ok := c.channels[name]
98 if !ok {
99 return nil, fmt.Errorf("unknown channel %q", name)
100 }
101 return ch, nil
102}
103
104func (c *upstreamConn) handleMessage(msg *irc.Message) error {
105 switch msg.Command {
106 case "PING":
107 // TODO: handle params
108 c.messages <- &irc.Message{
109 Command: "PONG",
110 Params: []string{c.srv.Hostname},
111 }
112 return nil
113 case "MODE":
114 var name, modeStr string
115 if err := parseMessageParams(msg, &name, &modeStr); err != nil {
116 return err
117 }
118
119 if name == msg.Prefix.Name { // user mode change
120 if name != c.nick {
121 return fmt.Errorf("received MODE message for unknow nick %q", name)
122 }
123 return c.modes.Apply(modeStr)
124 } else { // channel mode change
125 ch, err := c.getChannel(name)
126 if err != nil {
127 return err
128 }
129 if err := ch.modes.Apply(modeStr); err != nil {
130 return err
131 }
132 }
133
134 c.user.forEachDownstream(func(dc *downstreamConn) {
135 dc.messages <- msg
136 })
137 case "NOTICE":
138 c.logger.Print(msg)
139 case irc.RPL_WELCOME:
140 c.registered = true
141 c.logger.Printf("connection registered")
142
143 for _, ch := range c.upstream.Channels {
144 c.messages <- &irc.Message{
145 Command: "JOIN",
146 Params: []string{ch},
147 }
148 }
149 case irc.RPL_MYINFO:
150 if err := parseMessageParams(msg, nil, &c.serverName, nil, &c.availableUserModes, &c.availableChannelModes); err != nil {
151 return err
152 }
153 if len(msg.Params) > 5 {
154 c.channelModesWithParam = msg.Params[5]
155 }
156 case "NICK":
157 var newNick string
158 if err := parseMessageParams(msg, &newNick); err != nil {
159 return err
160 }
161
162 if msg.Prefix.Name == c.nick {
163 c.logger.Printf("changed nick from %q to %q", c.nick, newNick)
164 c.nick = newNick
165 }
166
167 for _, ch := range c.channels {
168 if membership, ok := ch.Members[msg.Prefix.Name]; ok {
169 delete(ch.Members, msg.Prefix.Name)
170 ch.Members[newNick] = membership
171 }
172 }
173
174 c.user.forEachDownstream(func(dc *downstreamConn) {
175 dc.messages <- msg
176 })
177 case "JOIN":
178 var channels string
179 if err := parseMessageParams(msg, &channels); err != nil {
180 return err
181 }
182
183 for _, ch := range strings.Split(channels, ",") {
184 if msg.Prefix.Name == c.nick {
185 c.logger.Printf("joined channel %q", ch)
186 c.channels[ch] = &upstreamChannel{
187 Name: ch,
188 conn: c,
189 Members: make(map[string]membership),
190 }
191 } else {
192 ch, err := c.getChannel(ch)
193 if err != nil {
194 return err
195 }
196 ch.Members[msg.Prefix.Name] = 0
197 }
198 }
199
200 c.user.forEachDownstream(func(dc *downstreamConn) {
201 dc.messages <- msg
202 })
203 case "PART":
204 var channels string
205 if err := parseMessageParams(msg, &channels); err != nil {
206 return err
207 }
208
209 for _, ch := range strings.Split(channels, ",") {
210 if msg.Prefix.Name == c.nick {
211 c.logger.Printf("parted channel %q", ch)
212 delete(c.channels, ch)
213 } else {
214 ch, err := c.getChannel(ch)
215 if err != nil {
216 return err
217 }
218 delete(ch.Members, msg.Prefix.Name)
219 }
220 }
221
222 c.user.forEachDownstream(func(dc *downstreamConn) {
223 dc.messages <- msg
224 })
225 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
226 var name, topic string
227 if err := parseMessageParams(msg, nil, &name, &topic); err != nil {
228 return err
229 }
230 ch, err := c.getChannel(name)
231 if err != nil {
232 return err
233 }
234 if msg.Command == irc.RPL_TOPIC {
235 ch.Topic = topic
236 } else {
237 ch.Topic = ""
238 }
239 case "TOPIC":
240 var name string
241 if err := parseMessageParams(msg, nil, &name); err != nil {
242 return err
243 }
244 ch, err := c.getChannel(name)
245 if err != nil {
246 return err
247 }
248 if len(msg.Params) > 1 {
249 ch.Topic = msg.Params[1]
250 } else {
251 ch.Topic = ""
252 }
253 case rpl_topicwhotime:
254 var name, who, timeStr string
255 if err := parseMessageParams(msg, nil, &name, &who, &timeStr); err != nil {
256 return err
257 }
258 ch, err := c.getChannel(name)
259 if err != nil {
260 return err
261 }
262 ch.TopicWho = who
263 sec, err := strconv.ParseInt(timeStr, 10, 64)
264 if err != nil {
265 return fmt.Errorf("failed to parse topic time: %v", err)
266 }
267 ch.TopicTime = time.Unix(sec, 0)
268 case irc.RPL_NAMREPLY:
269 var name, statusStr, members string
270 if err := parseMessageParams(msg, nil, &statusStr, &name, &members); err != nil {
271 return err
272 }
273 ch, err := c.getChannel(name)
274 if err != nil {
275 return err
276 }
277
278 status, err := parseChannelStatus(statusStr)
279 if err != nil {
280 return err
281 }
282 ch.Status = status
283
284 for _, s := range strings.Split(members, " ") {
285 membership, nick := parseMembershipPrefix(s)
286 ch.Members[nick] = membership
287 }
288 case irc.RPL_ENDOFNAMES:
289 var name string
290 if err := parseMessageParams(msg, nil, &name); err != nil {
291 return err
292 }
293 ch, err := c.getChannel(name)
294 if err != nil {
295 return err
296 }
297
298 if ch.complete {
299 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
300 }
301 ch.complete = true
302
303 c.user.forEachDownstream(func(dc *downstreamConn) {
304 forwardChannel(dc, ch)
305 })
306 case "PRIVMSG":
307 c.ring.Produce(msg)
308 c.user.forEachDownstream(func(dc *downstreamConn) {
309 dc.messages <- msg
310 })
311 case irc.RPL_YOURHOST, irc.RPL_CREATED:
312 // Ignore
313 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
314 // Ignore
315 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
316 // Ignore
317 case rpl_localusers, rpl_globalusers:
318 // Ignore
319 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
320 // Ignore
321 default:
322 c.logger.Printf("unhandled upstream message: %v", msg)
323 }
324 return nil
325}
326
327func (c *upstreamConn) register() {
328 c.nick = c.upstream.Nick
329 c.messages <- &irc.Message{
330 Command: "NICK",
331 Params: []string{c.upstream.Nick},
332 }
333 c.messages <- &irc.Message{
334 Command: "USER",
335 Params: []string{c.upstream.Username, "0", "*", c.upstream.Realname},
336 }
337}
338
339func (c *upstreamConn) readMessages() error {
340 for {
341 msg, err := c.irc.ReadMessage()
342 if err == io.EOF {
343 break
344 } else if err != nil {
345 return fmt.Errorf("failed to read IRC command: %v", err)
346 }
347
348 if err := c.handleMessage(msg); err != nil {
349 c.logger.Printf("failed to handle message %q: %v", msg, err)
350 }
351 }
352
353 return nil
354}
Note: See TracBrowser for help on using the repository browser.