source: code/trunk/upstream.go@ 41

Last change on this file since 41 was 40, checked in by contact, 5 years ago

Add user.forEachDownstream

File size: 7.0 KB
Line 
1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "strconv"
9 "strings"
10 "time"
11
12 "gopkg.in/irc.v3"
13)
14
15type upstreamChannel struct {
16 Name string
17 Topic string
18 TopicWho string
19 TopicTime time.Time
20 Status channelStatus
21 modes modeSet
22 Members map[string]membership
23 complete bool
24}
25
26type upstreamConn struct {
27 upstream *Upstream
28 logger Logger
29 net net.Conn
30 irc *irc.Conn
31 srv *Server
32 user *user
33 messages chan<- *irc.Message
34
35 serverName string
36 availableUserModes string
37 availableChannelModes string
38 channelModesWithParam string
39
40 registered bool
41 closed bool
42 modes modeSet
43 channels map[string]*upstreamChannel
44}
45
46func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
47 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
48 logger.Printf("connecting to server")
49
50 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
51 if err != nil {
52 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
53 }
54
55 msgs := make(chan *irc.Message, 64)
56 conn := &upstreamConn{
57 upstream: upstream,
58 logger: logger,
59 net: netConn,
60 irc: irc.NewConn(netConn),
61 srv: u.srv,
62 user: u,
63 messages: msgs,
64 channels: make(map[string]*upstreamChannel),
65 }
66
67 go func() {
68 for msg := range msgs {
69 if err := conn.irc.WriteMessage(msg); err != nil {
70 conn.logger.Printf("failed to write message: %v", err)
71 }
72 }
73 }()
74
75 return conn, nil
76}
77
78func (c *upstreamConn) Close() error {
79 if c.closed {
80 return fmt.Errorf("upstream connection already closed")
81 }
82 if err := c.net.Close(); err != nil {
83 return err
84 }
85 close(c.messages)
86 c.closed = true
87 return nil
88}
89
90func (c *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
91 ch, ok := c.channels[name]
92 if !ok {
93 return nil, fmt.Errorf("unknown channel %q", name)
94 }
95 return ch, nil
96}
97
98func (c *upstreamConn) handleMessage(msg *irc.Message) error {
99 switch msg.Command {
100 case "PING":
101 // TODO: handle params
102 c.messages <- &irc.Message{
103 Command: "PONG",
104 Params: []string{c.srv.Hostname},
105 }
106 return nil
107 case "MODE":
108 if len(msg.Params) < 2 {
109 return newNeedMoreParamsError(msg.Command)
110 }
111 name := msg.Params[0]
112 modeStr := msg.Params[1]
113
114 if name == msg.Prefix.Name { // user mode change
115 if name != c.upstream.Nick {
116 return fmt.Errorf("received MODE message for unknow nick %q", name)
117 }
118 return c.modes.Apply(modeStr)
119 } else { // channel mode change
120 ch, err := c.getChannel(name)
121 if err != nil {
122 return err
123 }
124 if err := ch.modes.Apply(modeStr); err != nil {
125 return err
126 }
127
128 c.user.forEachDownstream(func(dc *downstreamConn) {
129 dc.messages <- msg
130 })
131 }
132 case "NOTICE":
133 c.logger.Print(msg)
134 case irc.RPL_WELCOME:
135 c.registered = true
136 c.logger.Printf("connection registered")
137
138 for _, ch := range c.upstream.Channels {
139 c.messages <- &irc.Message{
140 Command: "JOIN",
141 Params: []string{ch},
142 }
143 }
144 case irc.RPL_MYINFO:
145 if len(msg.Params) < 5 {
146 return newNeedMoreParamsError(msg.Command)
147 }
148 c.serverName = msg.Params[1]
149 c.availableUserModes = msg.Params[3]
150 c.availableChannelModes = msg.Params[4]
151 if len(msg.Params) > 5 {
152 c.channelModesWithParam = msg.Params[5]
153 }
154 case "JOIN":
155 if len(msg.Params) < 1 {
156 return newNeedMoreParamsError(msg.Command)
157 }
158
159 for _, ch := range strings.Split(msg.Params[0], ",") {
160 if msg.Prefix.Name == c.upstream.Nick {
161 c.logger.Printf("joined channel %q", ch)
162 c.channels[ch] = &upstreamChannel{
163 Name: ch,
164 Members: make(map[string]membership),
165 }
166 } else {
167 ch, err := c.getChannel(ch)
168 if err != nil {
169 return err
170 }
171 ch.Members[msg.Prefix.Name] = 0
172 }
173 }
174
175 c.user.forEachDownstream(func(dc *downstreamConn) {
176 dc.messages <- msg
177 })
178 case "PART":
179 if len(msg.Params) < 1 {
180 return newNeedMoreParamsError(msg.Command)
181 }
182
183 for _, ch := range strings.Split(msg.Params[0], ",") {
184 if msg.Prefix.Name == c.upstream.Nick {
185 c.logger.Printf("parted channel %q", ch)
186 delete(c.channels, ch)
187 } else {
188 ch, err := c.getChannel(ch)
189 if err != nil {
190 return err
191 }
192 delete(ch.Members, msg.Prefix.Name)
193 }
194 }
195
196 c.user.forEachDownstream(func(dc *downstreamConn) {
197 dc.messages <- msg
198 })
199 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
200 if len(msg.Params) < 3 {
201 return newNeedMoreParamsError(msg.Command)
202 }
203 ch, err := c.getChannel(msg.Params[1])
204 if err != nil {
205 return err
206 }
207 if msg.Command == irc.RPL_TOPIC {
208 ch.Topic = msg.Params[2]
209 } else {
210 ch.Topic = ""
211 }
212 case "TOPIC":
213 if len(msg.Params) < 1 {
214 return newNeedMoreParamsError(msg.Command)
215 }
216 ch, err := c.getChannel(msg.Params[0])
217 if err != nil {
218 return err
219 }
220 if len(msg.Params) > 1 {
221 ch.Topic = msg.Params[1]
222 } else {
223 ch.Topic = ""
224 }
225 case rpl_topicwhotime:
226 if len(msg.Params) < 4 {
227 return newNeedMoreParamsError(msg.Command)
228 }
229 ch, err := c.getChannel(msg.Params[1])
230 if err != nil {
231 return err
232 }
233 ch.TopicWho = msg.Params[2]
234 sec, err := strconv.ParseInt(msg.Params[3], 10, 64)
235 if err != nil {
236 return fmt.Errorf("failed to parse topic time: %v", err)
237 }
238 ch.TopicTime = time.Unix(sec, 0)
239 case irc.RPL_NAMREPLY:
240 if len(msg.Params) < 4 {
241 return newNeedMoreParamsError(msg.Command)
242 }
243 ch, err := c.getChannel(msg.Params[2])
244 if err != nil {
245 return err
246 }
247
248 status, err := parseChannelStatus(msg.Params[1])
249 if err != nil {
250 return err
251 }
252 ch.Status = status
253
254 for _, s := range strings.Split(msg.Params[3], " ") {
255 membership, nick := parseMembershipPrefix(s)
256 ch.Members[nick] = membership
257 }
258 case irc.RPL_ENDOFNAMES:
259 if len(msg.Params) < 2 {
260 return newNeedMoreParamsError(msg.Command)
261 }
262 ch, err := c.getChannel(msg.Params[1])
263 if err != nil {
264 return err
265 }
266
267 if ch.complete {
268 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
269 }
270 ch.complete = true
271
272 c.user.forEachDownstream(func(dc *downstreamConn) {
273 forwardChannel(dc, ch)
274 })
275 case "PRIVMSG":
276 c.user.forEachDownstream(func(dc *downstreamConn) {
277 dc.messages <- msg
278 })
279 case irc.RPL_YOURHOST, irc.RPL_CREATED:
280 // Ignore
281 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
282 // Ignore
283 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
284 // Ignore
285 case rpl_localusers, rpl_globalusers:
286 // Ignore
287 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
288 // Ignore
289 default:
290 c.logger.Printf("unhandled upstream message: %v", msg)
291 }
292 return nil
293}
294
295func (c *upstreamConn) readMessages() error {
296 defer c.Close()
297
298 c.messages <- &irc.Message{
299 Command: "NICK",
300 Params: []string{c.upstream.Nick},
301 }
302
303 c.messages <- &irc.Message{
304 Command: "USER",
305 Params: []string{c.upstream.Username, "0", "*", c.upstream.Realname},
306 }
307
308 for {
309 msg, err := c.irc.ReadMessage()
310 if err == io.EOF {
311 break
312 } else if err != nil {
313 return fmt.Errorf("failed to read IRC command: %v", err)
314 }
315
316 if err := c.handleMessage(msg); err != nil {
317 c.logger.Printf("failed to handle message %q: %v", msg, err)
318 }
319 }
320
321 return c.Close()
322}
Note: See TracBrowser for help on using the repository browser.