source: code/trunk/user.go@ 245

Last change on this file since 245 was 241, checked in by contact, 5 years ago

Remove Ring.Close

This is unused.

File size: 7.9 KB
Line 
1package soju
2
3import (
4 "fmt"
5 "time"
6
7 "gopkg.in/irc.v3"
8)
9
10type event interface{}
11
12type eventUpstreamMessage struct {
13 msg *irc.Message
14 uc *upstreamConn
15}
16
17type eventUpstreamConnectionError struct {
18 net *network
19 err error
20}
21
22type eventUpstreamConnected struct {
23 uc *upstreamConn
24}
25
26type eventUpstreamDisconnected struct {
27 uc *upstreamConn
28}
29
30type eventUpstreamError struct {
31 uc *upstreamConn
32 err error
33}
34
35type eventDownstreamMessage struct {
36 msg *irc.Message
37 dc *downstreamConn
38}
39
40type eventDownstreamConnected struct {
41 dc *downstreamConn
42}
43
44type eventDownstreamDisconnected struct {
45 dc *downstreamConn
46}
47
48type network struct {
49 Network
50 user *user
51 ring *Ring
52 stopped chan struct{}
53
54 conn *upstreamConn
55 history map[string]uint64
56 lastError error
57}
58
59func newNetwork(user *user, record *Network) *network {
60 return &network{
61 Network: *record,
62 user: user,
63 ring: NewRing(user.srv.RingCap),
64 stopped: make(chan struct{}),
65 history: make(map[string]uint64),
66 }
67}
68
69func (net *network) forEachDownstream(f func(*downstreamConn)) {
70 net.user.forEachDownstream(func(dc *downstreamConn) {
71 if dc.network != nil && dc.network != net {
72 return
73 }
74 f(dc)
75 })
76}
77
78func (net *network) run() {
79 var lastTry time.Time
80 for {
81 select {
82 case <-net.stopped:
83 return
84 default:
85 // This space is intentionally left blank
86 }
87
88 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
89 delay := retryConnectMinDelay - dur
90 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
91 time.Sleep(delay)
92 }
93 lastTry = time.Now()
94
95 uc, err := connectToUpstream(net)
96 if err != nil {
97 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
98 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
99 continue
100 }
101
102 uc.register()
103 if err := uc.runUntilRegistered(); err != nil {
104 uc.logger.Printf("failed to register: %v", err)
105 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", err)}
106 uc.Close()
107 continue
108 }
109
110 net.user.events <- eventUpstreamConnected{uc}
111 if err := uc.readMessages(net.user.events); err != nil {
112 uc.logger.Printf("failed to handle messages: %v", err)
113 net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
114 }
115 uc.Close()
116 net.user.events <- eventUpstreamDisconnected{uc}
117 }
118}
119
120func (net *network) upstream() *upstreamConn {
121 return net.conn
122}
123
124func (net *network) Stop() {
125 select {
126 case <-net.stopped:
127 return
128 default:
129 close(net.stopped)
130 }
131
132 if uc := net.upstream(); uc != nil {
133 uc.Close()
134 }
135}
136
137func (net *network) createUpdateChannel(ch *Channel) error {
138 if dbCh, err := net.user.srv.db.GetChannel(net.ID, ch.Name); err == nil {
139 ch.ID = dbCh.ID
140 } else if err != ErrNoSuchChannel {
141 return err
142 }
143 return net.user.srv.db.StoreChannel(net.ID, ch)
144}
145
146func (net *network) deleteChannel(name string) error {
147 return net.user.srv.db.DeleteChannel(net.ID, name)
148}
149
150type user struct {
151 User
152 srv *Server
153
154 events chan event
155
156 networks []*network
157 downstreamConns []*downstreamConn
158
159 // LIST commands in progress
160 pendingLISTs []pendingLIST
161}
162
163type pendingLIST struct {
164 downstreamID uint64
165 // list of per-upstream LIST commands not yet sent or completed
166 pendingCommands map[int64]*irc.Message
167}
168
169func newUser(srv *Server, record *User) *user {
170 return &user{
171 User: *record,
172 srv: srv,
173 events: make(chan event, 64),
174 }
175}
176
177func (u *user) forEachNetwork(f func(*network)) {
178 for _, network := range u.networks {
179 f(network)
180 }
181}
182
183func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
184 for _, network := range u.networks {
185 uc := network.upstream()
186 if uc == nil {
187 continue
188 }
189 f(uc)
190 }
191}
192
193func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
194 for _, dc := range u.downstreamConns {
195 f(dc)
196 }
197}
198
199func (u *user) getNetwork(name string) *network {
200 for _, network := range u.networks {
201 if network.Addr == name {
202 return network
203 }
204 if network.Name != "" && network.Name == name {
205 return network
206 }
207 }
208 return nil
209}
210
211func (u *user) run() {
212 networks, err := u.srv.db.ListNetworks(u.Username)
213 if err != nil {
214 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
215 return
216 }
217
218 for _, record := range networks {
219 network := newNetwork(u, &record)
220 u.networks = append(u.networks, network)
221
222 go network.run()
223 }
224
225 for e := range u.events {
226 switch e := e.(type) {
227 case eventUpstreamConnected:
228 uc := e.uc
229
230 uc.network.conn = uc
231
232 uc.updateAway()
233
234 uc.forEachDownstream(func(dc *downstreamConn) {
235 sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.GetName()))
236 })
237 uc.network.lastError = nil
238 case eventUpstreamDisconnected:
239 uc := e.uc
240
241 uc.network.conn = nil
242
243 for _, ml := range uc.messageLoggers {
244 if err := ml.Close(); err != nil {
245 uc.logger.Printf("failed to close message logger: %v", err)
246 }
247 }
248
249 uc.endPendingLISTs(true)
250
251 if uc.network.lastError == nil {
252 uc.forEachDownstream(func(dc *downstreamConn) {
253 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.GetName()))
254 })
255 }
256 case eventUpstreamConnectionError:
257 net := e.net
258
259 if net.lastError == nil || net.lastError.Error() != e.err.Error() {
260 net.forEachDownstream(func(dc *downstreamConn) {
261 sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.GetName(), e.err))
262 })
263 }
264 net.lastError = e.err
265 case eventUpstreamError:
266 uc := e.uc
267
268 uc.forEachDownstream(func(dc *downstreamConn) {
269 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.GetName(), e.err))
270 })
271 uc.network.lastError = e.err
272 case eventUpstreamMessage:
273 msg, uc := e.msg, e.uc
274 if uc.isClosed() {
275 uc.logger.Printf("ignoring message on closed connection: %v", msg)
276 break
277 }
278 if err := uc.handleMessage(msg); err != nil {
279 uc.logger.Printf("failed to handle message %q: %v", msg, err)
280 }
281 case eventDownstreamConnected:
282 dc := e.dc
283
284 if err := dc.welcome(); err != nil {
285 dc.logger.Printf("failed to handle new registered connection: %v", err)
286 break
287 }
288
289 u.downstreamConns = append(u.downstreamConns, dc)
290
291 u.forEachUpstream(func(uc *upstreamConn) {
292 uc.updateAway()
293 })
294 case eventDownstreamDisconnected:
295 dc := e.dc
296
297 dc.forEachNetwork(func(net *network) {
298 seq := net.ring.Cur()
299 net.history[dc.clientName] = seq
300 })
301
302 for i := range u.downstreamConns {
303 if u.downstreamConns[i] == dc {
304 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
305 break
306 }
307 }
308
309 u.forEachUpstream(func(uc *upstreamConn) {
310 uc.updateAway()
311 })
312 case eventDownstreamMessage:
313 msg, dc := e.msg, e.dc
314 if dc.isClosed() {
315 dc.logger.Printf("ignoring message on closed connection: %v", msg)
316 break
317 }
318 err := dc.handleMessage(msg)
319 if ircErr, ok := err.(ircError); ok {
320 ircErr.Message.Prefix = dc.srv.prefix()
321 dc.SendMessage(ircErr.Message)
322 } else if err != nil {
323 dc.logger.Printf("failed to handle message %q: %v", msg, err)
324 dc.Close()
325 }
326 default:
327 u.srv.Logger.Printf("received unknown event type: %T", e)
328 }
329 }
330}
331
332func (u *user) createNetwork(net *Network) (*network, error) {
333 if net.ID != 0 {
334 panic("tried creating an already-existing network")
335 }
336
337 network := newNetwork(u, net)
338 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
339 if err != nil {
340 return nil, err
341 }
342
343 u.networks = append(u.networks, network)
344
345 go network.run()
346 return network, nil
347}
348
349func (u *user) deleteNetwork(id int64) error {
350 for i, net := range u.networks {
351 if net.ID != id {
352 continue
353 }
354
355 if err := u.srv.db.DeleteNetwork(net.ID); err != nil {
356 return err
357 }
358
359 u.forEachDownstream(func(dc *downstreamConn) {
360 if dc.network != nil && dc.network == net {
361 dc.Close()
362 }
363 })
364
365 net.Stop()
366 u.networks = append(u.networks[:i], u.networks[i+1:]...)
367 return nil
368 }
369
370 panic("tried deleting a non-existing network")
371}
Note: See TracBrowser for help on using the repository browser.