source: code/trunk/user.go@ 245

Last change on this file since 245 was 241, checked in by contact, 5 years ago

Remove Ring.Close

This is unused.

File size: 7.9 KB
RevLine 
[101]1package soju
2
3import (
[218]4 "fmt"
[101]5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[218]17type eventUpstreamConnectionError struct {
18 net *network
19 err error
20}
21
[196]22type eventUpstreamConnected struct {
23 uc *upstreamConn
24}
25
[179]26type eventUpstreamDisconnected struct {
27 uc *upstreamConn
28}
29
[218]30type eventUpstreamError struct {
31 uc *upstreamConn
32 err error
33}
34
[165]35type eventDownstreamMessage struct {
[103]36 msg *irc.Message
37 dc *downstreamConn
38}
39
[166]40type eventDownstreamConnected struct {
41 dc *downstreamConn
42}
43
[167]44type eventDownstreamDisconnected struct {
45 dc *downstreamConn
46}
47
[101]48type network struct {
49 Network
[202]50 user *user
51 ring *Ring
52 stopped chan struct{}
[131]53
[237]54 conn *upstreamConn
[218]55 history map[string]uint64
56 lastError error
[101]57}
58
59func newNetwork(user *user, record *Network) *network {
60 return &network{
61 Network: *record,
62 user: user,
[143]63 ring: NewRing(user.srv.RingCap),
[202]64 stopped: make(chan struct{}),
[131]65 history: make(map[string]uint64),
[101]66 }
67}
68
[218]69func (net *network) forEachDownstream(f func(*downstreamConn)) {
70 net.user.forEachDownstream(func(dc *downstreamConn) {
71 if dc.network != nil && dc.network != net {
72 return
73 }
74 f(dc)
75 })
76}
77
[101]78func (net *network) run() {
79 var lastTry time.Time
80 for {
[202]81 select {
82 case <-net.stopped:
83 return
84 default:
85 // This space is intentionally left blank
86 }
87
[101]88 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
89 delay := retryConnectMinDelay - dur
90 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
91 time.Sleep(delay)
92 }
93 lastTry = time.Now()
94
95 uc, err := connectToUpstream(net)
96 if err != nil {
97 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
[218]98 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
[101]99 continue
100 }
101
102 uc.register()
[197]103 if err := uc.runUntilRegistered(); err != nil {
104 uc.logger.Printf("failed to register: %v", err)
[218]105 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", err)}
[197]106 uc.Close()
107 continue
108 }
[101]109
[196]110 net.user.events <- eventUpstreamConnected{uc}
[165]111 if err := uc.readMessages(net.user.events); err != nil {
[101]112 uc.logger.Printf("failed to handle messages: %v", err)
[218]113 net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
[101]114 }
115 uc.Close()
[179]116 net.user.events <- eventUpstreamDisconnected{uc}
[101]117 }
118}
119
[136]120func (net *network) upstream() *upstreamConn {
121 return net.conn
122}
123
[202]124func (net *network) Stop() {
125 select {
126 case <-net.stopped:
127 return
128 default:
129 close(net.stopped)
130 }
131
132 if uc := net.upstream(); uc != nil {
133 uc.Close()
134 }
135}
136
[222]137func (net *network) createUpdateChannel(ch *Channel) error {
138 if dbCh, err := net.user.srv.db.GetChannel(net.ID, ch.Name); err == nil {
139 ch.ID = dbCh.ID
140 } else if err != ErrNoSuchChannel {
141 return err
142 }
143 return net.user.srv.db.StoreChannel(net.ID, ch)
144}
145
146func (net *network) deleteChannel(name string) error {
147 return net.user.srv.db.DeleteChannel(net.ID, name)
148}
149
[101]150type user struct {
151 User
152 srv *Server
153
[165]154 events chan event
[103]155
[101]156 networks []*network
157 downstreamConns []*downstreamConn
[177]158
159 // LIST commands in progress
[179]160 pendingLISTs []pendingLIST
[101]161}
162
[177]163type pendingLIST struct {
164 downstreamID uint64
165 // list of per-upstream LIST commands not yet sent or completed
166 pendingCommands map[int64]*irc.Message
167}
168
[101]169func newUser(srv *Server, record *User) *user {
170 return &user{
[165]171 User: *record,
172 srv: srv,
173 events: make(chan event, 64),
[101]174 }
175}
176
177func (u *user) forEachNetwork(f func(*network)) {
178 for _, network := range u.networks {
179 f(network)
180 }
181}
182
183func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
184 for _, network := range u.networks {
[136]185 uc := network.upstream()
[197]186 if uc == nil {
[101]187 continue
188 }
189 f(uc)
190 }
191}
192
193func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
194 for _, dc := range u.downstreamConns {
195 f(dc)
196 }
197}
198
199func (u *user) getNetwork(name string) *network {
200 for _, network := range u.networks {
201 if network.Addr == name {
202 return network
203 }
[201]204 if network.Name != "" && network.Name == name {
205 return network
206 }
[101]207 }
208 return nil
209}
210
211func (u *user) run() {
212 networks, err := u.srv.db.ListNetworks(u.Username)
213 if err != nil {
214 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
215 return
216 }
217
218 for _, record := range networks {
219 network := newNetwork(u, &record)
220 u.networks = append(u.networks, network)
221
222 go network.run()
223 }
[103]224
[165]225 for e := range u.events {
226 switch e := e.(type) {
[196]227 case eventUpstreamConnected:
[198]228 uc := e.uc
[199]229
230 uc.network.conn = uc
231
[198]232 uc.updateAway()
[218]233
234 uc.forEachDownstream(func(dc *downstreamConn) {
[223]235 sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.GetName()))
[218]236 })
237 uc.network.lastError = nil
[179]238 case eventUpstreamDisconnected:
239 uc := e.uc
[199]240
241 uc.network.conn = nil
242
[215]243 for _, ml := range uc.messageLoggers {
244 if err := ml.Close(); err != nil {
245 uc.logger.Printf("failed to close message logger: %v", err)
246 }
[179]247 }
[199]248
[181]249 uc.endPendingLISTs(true)
[218]250
251 if uc.network.lastError == nil {
252 uc.forEachDownstream(func(dc *downstreamConn) {
[223]253 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.GetName()))
[218]254 })
255 }
256 case eventUpstreamConnectionError:
257 net := e.net
258
259 if net.lastError == nil || net.lastError.Error() != e.err.Error() {
260 net.forEachDownstream(func(dc *downstreamConn) {
[223]261 sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.GetName(), e.err))
[218]262 })
263 }
264 net.lastError = e.err
265 case eventUpstreamError:
266 uc := e.uc
267
268 uc.forEachDownstream(func(dc *downstreamConn) {
[223]269 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.GetName(), e.err))
[218]270 })
271 uc.network.lastError = e.err
[165]272 case eventUpstreamMessage:
273 msg, uc := e.msg, e.uc
[175]274 if uc.isClosed() {
[133]275 uc.logger.Printf("ignoring message on closed connection: %v", msg)
276 break
277 }
[103]278 if err := uc.handleMessage(msg); err != nil {
279 uc.logger.Printf("failed to handle message %q: %v", msg, err)
280 }
[166]281 case eventDownstreamConnected:
282 dc := e.dc
[168]283
284 if err := dc.welcome(); err != nil {
285 dc.logger.Printf("failed to handle new registered connection: %v", err)
286 break
287 }
288
[166]289 u.downstreamConns = append(u.downstreamConns, dc)
[198]290
291 u.forEachUpstream(func(uc *upstreamConn) {
292 uc.updateAway()
293 })
[167]294 case eventDownstreamDisconnected:
295 dc := e.dc
[204]296
[231]297 dc.forEachNetwork(func(net *network) {
298 seq := net.ring.Cur()
[204]299 net.history[dc.clientName] = seq
[231]300 })
[204]301
[167]302 for i := range u.downstreamConns {
303 if u.downstreamConns[i] == dc {
304 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
305 break
306 }
307 }
[198]308
309 u.forEachUpstream(func(uc *upstreamConn) {
310 uc.updateAway()
311 })
[165]312 case eventDownstreamMessage:
313 msg, dc := e.msg, e.dc
[133]314 if dc.isClosed() {
315 dc.logger.Printf("ignoring message on closed connection: %v", msg)
316 break
317 }
[103]318 err := dc.handleMessage(msg)
319 if ircErr, ok := err.(ircError); ok {
320 ircErr.Message.Prefix = dc.srv.prefix()
321 dc.SendMessage(ircErr.Message)
322 } else if err != nil {
323 dc.logger.Printf("failed to handle message %q: %v", msg, err)
324 dc.Close()
325 }
[165]326 default:
327 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]328 }
329 }
[101]330}
331
[120]332func (u *user) createNetwork(net *Network) (*network, error) {
[144]333 if net.ID != 0 {
334 panic("tried creating an already-existing network")
335 }
336
[120]337 network := newNetwork(u, net)
[101]338 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
339 if err != nil {
340 return nil, err
341 }
[144]342
[101]343 u.networks = append(u.networks, network)
[144]344
[101]345 go network.run()
346 return network, nil
347}
[202]348
349func (u *user) deleteNetwork(id int64) error {
350 for i, net := range u.networks {
351 if net.ID != id {
352 continue
353 }
354
355 if err := u.srv.db.DeleteNetwork(net.ID); err != nil {
356 return err
357 }
358
359 u.forEachDownstream(func(dc *downstreamConn) {
360 if dc.network != nil && dc.network == net {
361 dc.Close()
362 }
363 })
364
365 net.Stop()
366 u.networks = append(u.networks[:i], u.networks[i+1:]...)
367 return nil
368 }
369
370 panic("tried deleting a non-existing network")
371}
Note: See TracBrowser for help on using the repository browser.