source: code/trunk/user.go@ 219

Last change on this file since 219 was 218, checked in by delthas, 5 years ago

Send one NOTICE on new upstream disconnect/connect errors

In order to notify the user when we are disconnected from a network
(either due to an error, or due a QUIT), and when we fail reconnecting,
this commit adds support for sending a short NOTICE message from the
service user to all relevant downstreams.

The last error is stored, and cleared on successful connection, to
ensure that the user is *not* flooded with identical connection error
messages, which can often happen when a server is down.

No lock is needed on lastError because it is only read and modified from
the user goroutine.

Closes: https://todo.sr.ht/~emersion/soju/27

File size: 7.8 KB
RevLine 
[101]1package soju
2
3import (
[218]4 "fmt"
[101]5 "sync"
6 "time"
[103]7
8 "gopkg.in/irc.v3"
[101]9)
10
[165]11type event interface{}
12
13type eventUpstreamMessage struct {
[103]14 msg *irc.Message
15 uc *upstreamConn
16}
17
[218]18type eventUpstreamConnectionError struct {
19 net *network
20 err error
21}
22
[196]23type eventUpstreamConnected struct {
24 uc *upstreamConn
25}
26
[179]27type eventUpstreamDisconnected struct {
28 uc *upstreamConn
29}
30
[218]31type eventUpstreamError struct {
32 uc *upstreamConn
33 err error
34}
35
[165]36type eventDownstreamMessage struct {
[103]37 msg *irc.Message
38 dc *downstreamConn
39}
40
[166]41type eventDownstreamConnected struct {
42 dc *downstreamConn
43}
44
[167]45type eventDownstreamDisconnected struct {
46 dc *downstreamConn
47}
48
[101]49type network struct {
50 Network
[202]51 user *user
52 ring *Ring
53 stopped chan struct{}
[131]54
[218]55 history map[string]uint64
56 lastError error
[204]57
58 lock sync.Mutex
59 conn *upstreamConn
[101]60}
61
62func newNetwork(user *user, record *Network) *network {
63 return &network{
64 Network: *record,
65 user: user,
[143]66 ring: NewRing(user.srv.RingCap),
[202]67 stopped: make(chan struct{}),
[131]68 history: make(map[string]uint64),
[101]69 }
70}
71
[218]72func (net *network) forEachDownstream(f func(*downstreamConn)) {
73 net.user.forEachDownstream(func(dc *downstreamConn) {
74 if dc.network != nil && dc.network != net {
75 return
76 }
77 f(dc)
78 })
79}
80
[101]81func (net *network) run() {
82 var lastTry time.Time
83 for {
[202]84 select {
85 case <-net.stopped:
86 return
87 default:
88 // This space is intentionally left blank
89 }
90
[101]91 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
92 delay := retryConnectMinDelay - dur
93 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
94 time.Sleep(delay)
95 }
96 lastTry = time.Now()
97
98 uc, err := connectToUpstream(net)
99 if err != nil {
100 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
[218]101 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
[101]102 continue
103 }
104
105 uc.register()
[197]106 if err := uc.runUntilRegistered(); err != nil {
107 uc.logger.Printf("failed to register: %v", err)
[218]108 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", err)}
[197]109 uc.Close()
110 continue
111 }
[101]112
[196]113 net.user.events <- eventUpstreamConnected{uc}
[165]114 if err := uc.readMessages(net.user.events); err != nil {
[101]115 uc.logger.Printf("failed to handle messages: %v", err)
[218]116 net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
[101]117 }
118 uc.Close()
[179]119 net.user.events <- eventUpstreamDisconnected{uc}
[101]120 }
121}
122
[136]123func (net *network) upstream() *upstreamConn {
124 net.lock.Lock()
125 defer net.lock.Unlock()
126 return net.conn
127}
128
[202]129func (net *network) Stop() {
130 select {
131 case <-net.stopped:
132 return
133 default:
134 close(net.stopped)
135 }
136
137 if uc := net.upstream(); uc != nil {
138 uc.Close()
139 }
140}
141
[101]142type user struct {
143 User
144 srv *Server
145
[165]146 events chan event
[103]147
[101]148 networks []*network
149 downstreamConns []*downstreamConn
[177]150
151 // LIST commands in progress
[179]152 pendingLISTs []pendingLIST
[101]153}
154
[177]155type pendingLIST struct {
156 downstreamID uint64
157 // list of per-upstream LIST commands not yet sent or completed
158 pendingCommands map[int64]*irc.Message
159}
160
[101]161func newUser(srv *Server, record *User) *user {
162 return &user{
[165]163 User: *record,
164 srv: srv,
165 events: make(chan event, 64),
[101]166 }
167}
168
169func (u *user) forEachNetwork(f func(*network)) {
170 for _, network := range u.networks {
171 f(network)
172 }
173}
174
175func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
176 for _, network := range u.networks {
[136]177 uc := network.upstream()
[197]178 if uc == nil {
[101]179 continue
180 }
181 f(uc)
182 }
183}
184
185func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
186 for _, dc := range u.downstreamConns {
187 f(dc)
188 }
189}
190
191func (u *user) getNetwork(name string) *network {
192 for _, network := range u.networks {
193 if network.Addr == name {
194 return network
195 }
[201]196 if network.Name != "" && network.Name == name {
197 return network
198 }
[101]199 }
200 return nil
201}
202
203func (u *user) run() {
204 networks, err := u.srv.db.ListNetworks(u.Username)
205 if err != nil {
206 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
207 return
208 }
209
210 for _, record := range networks {
211 network := newNetwork(u, &record)
212 u.networks = append(u.networks, network)
213
214 go network.run()
215 }
[103]216
[165]217 for e := range u.events {
218 switch e := e.(type) {
[196]219 case eventUpstreamConnected:
[198]220 uc := e.uc
[199]221
222 uc.network.lock.Lock()
223 uc.network.conn = uc
224 uc.network.lock.Unlock()
225
[198]226 uc.updateAway()
[218]227
228 uc.forEachDownstream(func(dc *downstreamConn) {
229 sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.Name))
230 })
231 uc.network.lastError = nil
[179]232 case eventUpstreamDisconnected:
233 uc := e.uc
[199]234
235 uc.network.lock.Lock()
236 uc.network.conn = nil
237 uc.network.lock.Unlock()
238
[215]239 for _, ml := range uc.messageLoggers {
240 if err := ml.Close(); err != nil {
241 uc.logger.Printf("failed to close message logger: %v", err)
242 }
[179]243 }
[199]244
[181]245 uc.endPendingLISTs(true)
[218]246
247 if uc.network.lastError == nil {
248 uc.forEachDownstream(func(dc *downstreamConn) {
249 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.Name))
250 })
251 }
252 case eventUpstreamConnectionError:
253 net := e.net
254
255 if net.lastError == nil || net.lastError.Error() != e.err.Error() {
256 net.forEachDownstream(func(dc *downstreamConn) {
257 sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.Name, e.err))
258 })
259 }
260 net.lastError = e.err
261 case eventUpstreamError:
262 uc := e.uc
263
264 uc.forEachDownstream(func(dc *downstreamConn) {
265 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.Name, e.err))
266 })
267 uc.network.lastError = e.err
[165]268 case eventUpstreamMessage:
269 msg, uc := e.msg, e.uc
[175]270 if uc.isClosed() {
[133]271 uc.logger.Printf("ignoring message on closed connection: %v", msg)
272 break
273 }
[103]274 if err := uc.handleMessage(msg); err != nil {
275 uc.logger.Printf("failed to handle message %q: %v", msg, err)
276 }
[166]277 case eventDownstreamConnected:
278 dc := e.dc
[168]279
280 if err := dc.welcome(); err != nil {
281 dc.logger.Printf("failed to handle new registered connection: %v", err)
282 break
283 }
284
[166]285 u.downstreamConns = append(u.downstreamConns, dc)
[198]286
287 u.forEachUpstream(func(uc *upstreamConn) {
288 uc.updateAway()
289 })
[167]290 case eventDownstreamDisconnected:
291 dc := e.dc
[204]292
293 for net, rc := range dc.ringConsumers {
294 seq := rc.Close()
295 net.history[dc.clientName] = seq
296 }
297
[167]298 for i := range u.downstreamConns {
299 if u.downstreamConns[i] == dc {
300 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
301 break
302 }
303 }
[198]304
305 u.forEachUpstream(func(uc *upstreamConn) {
306 uc.updateAway()
307 })
[165]308 case eventDownstreamMessage:
309 msg, dc := e.msg, e.dc
[133]310 if dc.isClosed() {
311 dc.logger.Printf("ignoring message on closed connection: %v", msg)
312 break
313 }
[103]314 err := dc.handleMessage(msg)
315 if ircErr, ok := err.(ircError); ok {
316 ircErr.Message.Prefix = dc.srv.prefix()
317 dc.SendMessage(ircErr.Message)
318 } else if err != nil {
319 dc.logger.Printf("failed to handle message %q: %v", msg, err)
320 dc.Close()
321 }
[165]322 default:
323 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]324 }
325 }
[101]326}
327
[120]328func (u *user) createNetwork(net *Network) (*network, error) {
[144]329 if net.ID != 0 {
330 panic("tried creating an already-existing network")
331 }
332
[120]333 network := newNetwork(u, net)
[101]334 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
335 if err != nil {
336 return nil, err
337 }
[144]338
339 u.forEachDownstream(func(dc *downstreamConn) {
340 if dc.network == nil {
341 dc.runNetwork(network, false)
342 }
343 })
344
[101]345 u.networks = append(u.networks, network)
[144]346
[101]347 go network.run()
348 return network, nil
349}
[202]350
351func (u *user) deleteNetwork(id int64) error {
352 for i, net := range u.networks {
353 if net.ID != id {
354 continue
355 }
356
357 if err := u.srv.db.DeleteNetwork(net.ID); err != nil {
358 return err
359 }
360
361 u.forEachDownstream(func(dc *downstreamConn) {
362 if dc.network != nil && dc.network == net {
363 dc.Close()
364 }
365 })
366
367 net.Stop()
[203]368 net.ring.Close()
[202]369 u.networks = append(u.networks[:i], u.networks[i+1:]...)
370 return nil
371 }
372
373 panic("tried deleting a non-existing network")
374}
Note: See TracBrowser for help on using the repository browser.