source: code/trunk/user.go@ 219

Last change on this file since 219 was 218, checked in by delthas, 5 years ago

Send one NOTICE on new upstream disconnect/connect errors

In order to notify the user when we are disconnected from a network
(either due to an error, or due a QUIT), and when we fail reconnecting,
this commit adds support for sending a short NOTICE message from the
service user to all relevant downstreams.

The last error is stored, and cleared on successful connection, to
ensure that the user is *not* flooded with identical connection error
messages, which can often happen when a server is down.

No lock is needed on lastError because it is only read and modified from
the user goroutine.

Closes: https://todo.sr.ht/~emersion/soju/27

File size: 7.8 KB
Line 
1package soju
2
3import (
4 "fmt"
5 "sync"
6 "time"
7
8 "gopkg.in/irc.v3"
9)
10
11type event interface{}
12
13type eventUpstreamMessage struct {
14 msg *irc.Message
15 uc *upstreamConn
16}
17
18type eventUpstreamConnectionError struct {
19 net *network
20 err error
21}
22
23type eventUpstreamConnected struct {
24 uc *upstreamConn
25}
26
27type eventUpstreamDisconnected struct {
28 uc *upstreamConn
29}
30
31type eventUpstreamError struct {
32 uc *upstreamConn
33 err error
34}
35
36type eventDownstreamMessage struct {
37 msg *irc.Message
38 dc *downstreamConn
39}
40
41type eventDownstreamConnected struct {
42 dc *downstreamConn
43}
44
45type eventDownstreamDisconnected struct {
46 dc *downstreamConn
47}
48
49type network struct {
50 Network
51 user *user
52 ring *Ring
53 stopped chan struct{}
54
55 history map[string]uint64
56 lastError error
57
58 lock sync.Mutex
59 conn *upstreamConn
60}
61
62func newNetwork(user *user, record *Network) *network {
63 return &network{
64 Network: *record,
65 user: user,
66 ring: NewRing(user.srv.RingCap),
67 stopped: make(chan struct{}),
68 history: make(map[string]uint64),
69 }
70}
71
72func (net *network) forEachDownstream(f func(*downstreamConn)) {
73 net.user.forEachDownstream(func(dc *downstreamConn) {
74 if dc.network != nil && dc.network != net {
75 return
76 }
77 f(dc)
78 })
79}
80
81func (net *network) run() {
82 var lastTry time.Time
83 for {
84 select {
85 case <-net.stopped:
86 return
87 default:
88 // This space is intentionally left blank
89 }
90
91 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
92 delay := retryConnectMinDelay - dur
93 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
94 time.Sleep(delay)
95 }
96 lastTry = time.Now()
97
98 uc, err := connectToUpstream(net)
99 if err != nil {
100 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
101 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to connect: %v", err)}
102 continue
103 }
104
105 uc.register()
106 if err := uc.runUntilRegistered(); err != nil {
107 uc.logger.Printf("failed to register: %v", err)
108 net.user.events <- eventUpstreamConnectionError{net, fmt.Errorf("failed to register: %v", err)}
109 uc.Close()
110 continue
111 }
112
113 net.user.events <- eventUpstreamConnected{uc}
114 if err := uc.readMessages(net.user.events); err != nil {
115 uc.logger.Printf("failed to handle messages: %v", err)
116 net.user.events <- eventUpstreamError{uc, fmt.Errorf("failed to handle messages: %v", err)}
117 }
118 uc.Close()
119 net.user.events <- eventUpstreamDisconnected{uc}
120 }
121}
122
123func (net *network) upstream() *upstreamConn {
124 net.lock.Lock()
125 defer net.lock.Unlock()
126 return net.conn
127}
128
129func (net *network) Stop() {
130 select {
131 case <-net.stopped:
132 return
133 default:
134 close(net.stopped)
135 }
136
137 if uc := net.upstream(); uc != nil {
138 uc.Close()
139 }
140}
141
142type user struct {
143 User
144 srv *Server
145
146 events chan event
147
148 networks []*network
149 downstreamConns []*downstreamConn
150
151 // LIST commands in progress
152 pendingLISTs []pendingLIST
153}
154
155type pendingLIST struct {
156 downstreamID uint64
157 // list of per-upstream LIST commands not yet sent or completed
158 pendingCommands map[int64]*irc.Message
159}
160
161func newUser(srv *Server, record *User) *user {
162 return &user{
163 User: *record,
164 srv: srv,
165 events: make(chan event, 64),
166 }
167}
168
169func (u *user) forEachNetwork(f func(*network)) {
170 for _, network := range u.networks {
171 f(network)
172 }
173}
174
175func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
176 for _, network := range u.networks {
177 uc := network.upstream()
178 if uc == nil {
179 continue
180 }
181 f(uc)
182 }
183}
184
185func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
186 for _, dc := range u.downstreamConns {
187 f(dc)
188 }
189}
190
191func (u *user) getNetwork(name string) *network {
192 for _, network := range u.networks {
193 if network.Addr == name {
194 return network
195 }
196 if network.Name != "" && network.Name == name {
197 return network
198 }
199 }
200 return nil
201}
202
203func (u *user) run() {
204 networks, err := u.srv.db.ListNetworks(u.Username)
205 if err != nil {
206 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
207 return
208 }
209
210 for _, record := range networks {
211 network := newNetwork(u, &record)
212 u.networks = append(u.networks, network)
213
214 go network.run()
215 }
216
217 for e := range u.events {
218 switch e := e.(type) {
219 case eventUpstreamConnected:
220 uc := e.uc
221
222 uc.network.lock.Lock()
223 uc.network.conn = uc
224 uc.network.lock.Unlock()
225
226 uc.updateAway()
227
228 uc.forEachDownstream(func(dc *downstreamConn) {
229 sendServiceNOTICE(dc, fmt.Sprintf("connected to %s", uc.network.Name))
230 })
231 uc.network.lastError = nil
232 case eventUpstreamDisconnected:
233 uc := e.uc
234
235 uc.network.lock.Lock()
236 uc.network.conn = nil
237 uc.network.lock.Unlock()
238
239 for _, ml := range uc.messageLoggers {
240 if err := ml.Close(); err != nil {
241 uc.logger.Printf("failed to close message logger: %v", err)
242 }
243 }
244
245 uc.endPendingLISTs(true)
246
247 if uc.network.lastError == nil {
248 uc.forEachDownstream(func(dc *downstreamConn) {
249 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s", uc.network.Name))
250 })
251 }
252 case eventUpstreamConnectionError:
253 net := e.net
254
255 if net.lastError == nil || net.lastError.Error() != e.err.Error() {
256 net.forEachDownstream(func(dc *downstreamConn) {
257 sendServiceNOTICE(dc, fmt.Sprintf("failed connecting/registering to %s: %v", net.Name, e.err))
258 })
259 }
260 net.lastError = e.err
261 case eventUpstreamError:
262 uc := e.uc
263
264 uc.forEachDownstream(func(dc *downstreamConn) {
265 sendServiceNOTICE(dc, fmt.Sprintf("disconnected from %s: %v", uc.network.Name, e.err))
266 })
267 uc.network.lastError = e.err
268 case eventUpstreamMessage:
269 msg, uc := e.msg, e.uc
270 if uc.isClosed() {
271 uc.logger.Printf("ignoring message on closed connection: %v", msg)
272 break
273 }
274 if err := uc.handleMessage(msg); err != nil {
275 uc.logger.Printf("failed to handle message %q: %v", msg, err)
276 }
277 case eventDownstreamConnected:
278 dc := e.dc
279
280 if err := dc.welcome(); err != nil {
281 dc.logger.Printf("failed to handle new registered connection: %v", err)
282 break
283 }
284
285 u.downstreamConns = append(u.downstreamConns, dc)
286
287 u.forEachUpstream(func(uc *upstreamConn) {
288 uc.updateAway()
289 })
290 case eventDownstreamDisconnected:
291 dc := e.dc
292
293 for net, rc := range dc.ringConsumers {
294 seq := rc.Close()
295 net.history[dc.clientName] = seq
296 }
297
298 for i := range u.downstreamConns {
299 if u.downstreamConns[i] == dc {
300 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
301 break
302 }
303 }
304
305 u.forEachUpstream(func(uc *upstreamConn) {
306 uc.updateAway()
307 })
308 case eventDownstreamMessage:
309 msg, dc := e.msg, e.dc
310 if dc.isClosed() {
311 dc.logger.Printf("ignoring message on closed connection: %v", msg)
312 break
313 }
314 err := dc.handleMessage(msg)
315 if ircErr, ok := err.(ircError); ok {
316 ircErr.Message.Prefix = dc.srv.prefix()
317 dc.SendMessage(ircErr.Message)
318 } else if err != nil {
319 dc.logger.Printf("failed to handle message %q: %v", msg, err)
320 dc.Close()
321 }
322 default:
323 u.srv.Logger.Printf("received unknown event type: %T", e)
324 }
325 }
326}
327
328func (u *user) createNetwork(net *Network) (*network, error) {
329 if net.ID != 0 {
330 panic("tried creating an already-existing network")
331 }
332
333 network := newNetwork(u, net)
334 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
335 if err != nil {
336 return nil, err
337 }
338
339 u.forEachDownstream(func(dc *downstreamConn) {
340 if dc.network == nil {
341 dc.runNetwork(network, false)
342 }
343 })
344
345 u.networks = append(u.networks, network)
346
347 go network.run()
348 return network, nil
349}
350
351func (u *user) deleteNetwork(id int64) error {
352 for i, net := range u.networks {
353 if net.ID != id {
354 continue
355 }
356
357 if err := u.srv.db.DeleteNetwork(net.ID); err != nil {
358 return err
359 }
360
361 u.forEachDownstream(func(dc *downstreamConn) {
362 if dc.network != nil && dc.network == net {
363 dc.Close()
364 }
365 })
366
367 net.Stop()
368 net.ring.Close()
369 u.networks = append(u.networks[:i], u.networks[i+1:]...)
370 return nil
371 }
372
373 panic("tried deleting a non-existing network")
374}
Note: See TracBrowser for help on using the repository browser.