source: code/trunk/upstream.go@ 40

Last change on this file since 40 was 40, checked in by contact, 5 years ago

Add user.forEachDownstream

File size: 7.0 KB
RevLine 
[13]1package jounce
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
[19]8 "strconv"
[17]9 "strings"
[19]10 "time"
[13]11
12 "gopkg.in/irc.v3"
13)
14
[19]15type upstreamChannel struct {
16 Name string
17 Topic string
18 TopicWho string
19 TopicTime time.Time
20 Status channelStatus
[35]21 modes modeSet
[19]22 Members map[string]membership
[25]23 complete bool
[19]24}
25
[13]26type upstreamConn struct {
[19]27 upstream *Upstream
[21]28 logger Logger
[19]29 net net.Conn
30 irc *irc.Conn
31 srv *Server
[37]32 user *user
[33]33 messages chan<- *irc.Message
[16]34
35 serverName string
36 availableUserModes string
37 availableChannelModes string
38 channelModesWithParam string
[19]39
40 registered bool
[33]41 closed bool
[19]42 modes modeSet
43 channels map[string]*upstreamChannel
[13]44}
45
[37]46func connectToUpstream(u *user, upstream *Upstream) (*upstreamConn, error) {
47 logger := &prefixLogger{u.srv.Logger, fmt.Sprintf("upstream %q: ", upstream.Addr)}
[33]48 logger.Printf("connecting to server")
49
50 netConn, err := tls.Dial("tcp", upstream.Addr, nil)
51 if err != nil {
52 return nil, fmt.Errorf("failed to dial %q: %v", upstream.Addr, err)
53 }
54
55 msgs := make(chan *irc.Message, 64)
56 conn := &upstreamConn{
57 upstream: upstream,
58 logger: logger,
59 net: netConn,
60 irc: irc.NewConn(netConn),
[37]61 srv: u.srv,
62 user: u,
[33]63 messages: msgs,
64 channels: make(map[string]*upstreamChannel),
65 }
66
67 go func() {
68 for msg := range msgs {
69 if err := conn.irc.WriteMessage(msg); err != nil {
70 conn.logger.Printf("failed to write message: %v", err)
71 }
72 }
73 }()
74
75 return conn, nil
76}
77
78func (c *upstreamConn) Close() error {
79 if c.closed {
80 return fmt.Errorf("upstream connection already closed")
81 }
82 if err := c.net.Close(); err != nil {
83 return err
84 }
85 close(c.messages)
86 c.closed = true
87 return nil
88}
89
[19]90func (c *upstreamConn) getChannel(name string) (*upstreamChannel, error) {
91 ch, ok := c.channels[name]
92 if !ok {
93 return nil, fmt.Errorf("unknown channel %q", name)
94 }
95 return ch, nil
96}
97
[13]98func (c *upstreamConn) handleMessage(msg *irc.Message) error {
99 switch msg.Command {
100 case "PING":
101 // TODO: handle params
[33]102 c.messages <- &irc.Message{
[13]103 Command: "PONG",
104 Params: []string{c.srv.Hostname},
[33]105 }
106 return nil
[17]107 case "MODE":
108 if len(msg.Params) < 2 {
109 return newNeedMoreParamsError(msg.Command)
110 }
[35]111 name := msg.Params[0]
112 modeStr := msg.Params[1]
113
114 if name == msg.Prefix.Name { // user mode change
115 if name != c.upstream.Nick {
116 return fmt.Errorf("received MODE message for unknow nick %q", name)
117 }
118 return c.modes.Apply(modeStr)
119 } else { // channel mode change
120 ch, err := c.getChannel(name)
121 if err != nil {
122 return err
123 }
124 if err := ch.modes.Apply(modeStr); err != nil {
125 return err
126 }
127
[40]128 c.user.forEachDownstream(func(dc *downstreamConn) {
[35]129 dc.messages <- msg
[40]130 })
[17]131 }
[18]132 case "NOTICE":
[21]133 c.logger.Print(msg)
[14]134 case irc.RPL_WELCOME:
135 c.registered = true
[21]136 c.logger.Printf("connection registered")
[19]137
138 for _, ch := range c.upstream.Channels {
[33]139 c.messages <- &irc.Message{
[19]140 Command: "JOIN",
141 Params: []string{ch},
142 }
143 }
[16]144 case irc.RPL_MYINFO:
145 if len(msg.Params) < 5 {
146 return newNeedMoreParamsError(msg.Command)
147 }
148 c.serverName = msg.Params[1]
149 c.availableUserModes = msg.Params[3]
150 c.availableChannelModes = msg.Params[4]
151 if len(msg.Params) > 5 {
152 c.channelModesWithParam = msg.Params[5]
153 }
[19]154 case "JOIN":
155 if len(msg.Params) < 1 {
156 return newNeedMoreParamsError(msg.Command)
157 }
[34]158
[19]159 for _, ch := range strings.Split(msg.Params[0], ",") {
[34]160 if msg.Prefix.Name == c.upstream.Nick {
161 c.logger.Printf("joined channel %q", ch)
162 c.channels[ch] = &upstreamChannel{
163 Name: ch,
164 Members: make(map[string]membership),
165 }
166 } else {
167 ch, err := c.getChannel(ch)
168 if err != nil {
169 return err
170 }
171 ch.Members[msg.Prefix.Name] = 0
[19]172 }
173 }
[34]174
[40]175 c.user.forEachDownstream(func(dc *downstreamConn) {
[34]176 dc.messages <- msg
[40]177 })
[34]178 case "PART":
179 if len(msg.Params) < 1 {
180 return newNeedMoreParamsError(msg.Command)
181 }
182
183 for _, ch := range strings.Split(msg.Params[0], ",") {
184 if msg.Prefix.Name == c.upstream.Nick {
185 c.logger.Printf("parted channel %q", ch)
186 delete(c.channels, ch)
187 } else {
188 ch, err := c.getChannel(ch)
189 if err != nil {
190 return err
191 }
192 delete(ch.Members, msg.Prefix.Name)
193 }
194 }
195
[40]196 c.user.forEachDownstream(func(dc *downstreamConn) {
[34]197 dc.messages <- msg
[40]198 })
[19]199 case irc.RPL_TOPIC, irc.RPL_NOTOPIC:
200 if len(msg.Params) < 3 {
201 return newNeedMoreParamsError(msg.Command)
202 }
203 ch, err := c.getChannel(msg.Params[1])
204 if err != nil {
205 return err
206 }
207 if msg.Command == irc.RPL_TOPIC {
208 ch.Topic = msg.Params[2]
209 } else {
210 ch.Topic = ""
211 }
212 case "TOPIC":
213 if len(msg.Params) < 1 {
214 return newNeedMoreParamsError(msg.Command)
215 }
216 ch, err := c.getChannel(msg.Params[0])
217 if err != nil {
218 return err
219 }
220 if len(msg.Params) > 1 {
221 ch.Topic = msg.Params[1]
222 } else {
223 ch.Topic = ""
224 }
225 case rpl_topicwhotime:
226 if len(msg.Params) < 4 {
227 return newNeedMoreParamsError(msg.Command)
228 }
229 ch, err := c.getChannel(msg.Params[1])
230 if err != nil {
231 return err
232 }
233 ch.TopicWho = msg.Params[2]
234 sec, err := strconv.ParseInt(msg.Params[3], 10, 64)
235 if err != nil {
236 return fmt.Errorf("failed to parse topic time: %v", err)
237 }
238 ch.TopicTime = time.Unix(sec, 0)
239 case irc.RPL_NAMREPLY:
240 if len(msg.Params) < 4 {
241 return newNeedMoreParamsError(msg.Command)
242 }
243 ch, err := c.getChannel(msg.Params[2])
244 if err != nil {
245 return err
246 }
247
248 status, err := parseChannelStatus(msg.Params[1])
249 if err != nil {
250 return err
251 }
252 ch.Status = status
253
254 for _, s := range strings.Split(msg.Params[3], " ") {
255 membership, nick := parseMembershipPrefix(s)
256 ch.Members[nick] = membership
257 }
258 case irc.RPL_ENDOFNAMES:
[25]259 if len(msg.Params) < 2 {
260 return newNeedMoreParamsError(msg.Command)
261 }
262 ch, err := c.getChannel(msg.Params[1])
263 if err != nil {
264 return err
265 }
266
[34]267 if ch.complete {
268 return fmt.Errorf("received unexpected RPL_ENDOFNAMES")
269 }
[25]270 ch.complete = true
[27]271
[40]272 c.user.forEachDownstream(func(dc *downstreamConn) {
[27]273 forwardChannel(dc, ch)
[40]274 })
[36]275 case "PRIVMSG":
[40]276 c.user.forEachDownstream(func(dc *downstreamConn) {
[36]277 dc.messages <- msg
[40]278 })
[16]279 case irc.RPL_YOURHOST, irc.RPL_CREATED:
[14]280 // Ignore
281 case irc.RPL_LUSERCLIENT, irc.RPL_LUSEROP, irc.RPL_LUSERUNKNOWN, irc.RPL_LUSERCHANNELS, irc.RPL_LUSERME:
282 // Ignore
283 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD:
284 // Ignore
285 case rpl_localusers, rpl_globalusers:
286 // Ignore
287 case irc.RPL_STATSVLINE, irc.RPL_STATSPING, irc.RPL_STATSBLINE, irc.RPL_STATSDLINE:
288 // Ignore
[13]289 default:
[21]290 c.logger.Printf("unhandled upstream message: %v", msg)
[13]291 }
[14]292 return nil
[13]293}
294
[23]295func (c *upstreamConn) readMessages() error {
[33]296 defer c.Close()
[13]297
[33]298 c.messages <- &irc.Message{
[13]299 Command: "NICK",
[23]300 Params: []string{c.upstream.Nick},
[13]301 }
302
[33]303 c.messages <- &irc.Message{
[13]304 Command: "USER",
[23]305 Params: []string{c.upstream.Username, "0", "*", c.upstream.Realname},
[13]306 }
307
308 for {
309 msg, err := c.irc.ReadMessage()
310 if err == io.EOF {
311 break
312 } else if err != nil {
313 return fmt.Errorf("failed to read IRC command: %v", err)
314 }
315
316 if err := c.handleMessage(msg); err != nil {
[21]317 c.logger.Printf("failed to handle message %q: %v", msg, err)
[13]318 }
319 }
320
[33]321 return c.Close()
[13]322}
Note: See TracBrowser for help on using the repository browser.