source: code/trunk/user.go@ 217

Last change on this file since 217 was 215, checked in by contact, 5 years ago

Introduce messageLogger

This centralizes formatting related to message logging in a single
place.

File size: 6.3 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[196]17type eventUpstreamConnected struct {
18 uc *upstreamConn
19}
20
[179]21type eventUpstreamDisconnected struct {
22 uc *upstreamConn
23}
24
[165]25type eventDownstreamMessage struct {
[103]26 msg *irc.Message
27 dc *downstreamConn
28}
29
[166]30type eventDownstreamConnected struct {
31 dc *downstreamConn
32}
33
[167]34type eventDownstreamDisconnected struct {
35 dc *downstreamConn
36}
37
[101]38type network struct {
39 Network
[202]40 user *user
41 ring *Ring
42 stopped chan struct{}
[131]43
44 history map[string]uint64
[204]45
46 lock sync.Mutex
47 conn *upstreamConn
[101]48}
49
50func newNetwork(user *user, record *Network) *network {
51 return &network{
52 Network: *record,
53 user: user,
[143]54 ring: NewRing(user.srv.RingCap),
[202]55 stopped: make(chan struct{}),
[131]56 history: make(map[string]uint64),
[101]57 }
58}
59
60func (net *network) run() {
61 var lastTry time.Time
62 for {
[202]63 select {
64 case <-net.stopped:
65 return
66 default:
67 // This space is intentionally left blank
68 }
69
[101]70 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
71 delay := retryConnectMinDelay - dur
72 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
73 time.Sleep(delay)
74 }
75 lastTry = time.Now()
76
77 uc, err := connectToUpstream(net)
78 if err != nil {
79 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
80 continue
81 }
82
83 uc.register()
[197]84 if err := uc.runUntilRegistered(); err != nil {
85 uc.logger.Printf("failed to register: %v", err)
86 uc.Close()
87 continue
88 }
[101]89
[196]90 net.user.events <- eventUpstreamConnected{uc}
[165]91 if err := uc.readMessages(net.user.events); err != nil {
[101]92 uc.logger.Printf("failed to handle messages: %v", err)
93 }
94 uc.Close()
[179]95 net.user.events <- eventUpstreamDisconnected{uc}
[101]96 }
97}
98
[136]99func (net *network) upstream() *upstreamConn {
100 net.lock.Lock()
101 defer net.lock.Unlock()
102 return net.conn
103}
104
[202]105func (net *network) Stop() {
106 select {
107 case <-net.stopped:
108 return
109 default:
110 close(net.stopped)
111 }
112
113 if uc := net.upstream(); uc != nil {
114 uc.Close()
115 }
116}
117
[101]118type user struct {
119 User
120 srv *Server
121
[165]122 events chan event
[103]123
[101]124 networks []*network
125 downstreamConns []*downstreamConn
[177]126
127 // LIST commands in progress
[179]128 pendingLISTs []pendingLIST
[101]129}
130
[177]131type pendingLIST struct {
132 downstreamID uint64
133 // list of per-upstream LIST commands not yet sent or completed
134 pendingCommands map[int64]*irc.Message
135}
136
[101]137func newUser(srv *Server, record *User) *user {
138 return &user{
[165]139 User: *record,
140 srv: srv,
141 events: make(chan event, 64),
[101]142 }
143}
144
145func (u *user) forEachNetwork(f func(*network)) {
146 for _, network := range u.networks {
147 f(network)
148 }
149}
150
151func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
152 for _, network := range u.networks {
[136]153 uc := network.upstream()
[197]154 if uc == nil {
[101]155 continue
156 }
157 f(uc)
158 }
159}
160
161func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
162 for _, dc := range u.downstreamConns {
163 f(dc)
164 }
165}
166
167func (u *user) getNetwork(name string) *network {
168 for _, network := range u.networks {
169 if network.Addr == name {
170 return network
171 }
[201]172 if network.Name != "" && network.Name == name {
173 return network
174 }
[101]175 }
176 return nil
177}
178
179func (u *user) run() {
180 networks, err := u.srv.db.ListNetworks(u.Username)
181 if err != nil {
182 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
183 return
184 }
185
186 for _, record := range networks {
187 network := newNetwork(u, &record)
188 u.networks = append(u.networks, network)
189
190 go network.run()
191 }
[103]192
[165]193 for e := range u.events {
194 switch e := e.(type) {
[196]195 case eventUpstreamConnected:
[198]196 uc := e.uc
[199]197
198 uc.network.lock.Lock()
199 uc.network.conn = uc
200 uc.network.lock.Unlock()
201
[198]202 uc.updateAway()
[179]203 case eventUpstreamDisconnected:
204 uc := e.uc
[199]205
206 uc.network.lock.Lock()
207 uc.network.conn = nil
208 uc.network.lock.Unlock()
209
[215]210 for _, ml := range uc.messageLoggers {
211 if err := ml.Close(); err != nil {
212 uc.logger.Printf("failed to close message logger: %v", err)
213 }
[179]214 }
[199]215
[181]216 uc.endPendingLISTs(true)
[165]217 case eventUpstreamMessage:
218 msg, uc := e.msg, e.uc
[175]219 if uc.isClosed() {
[133]220 uc.logger.Printf("ignoring message on closed connection: %v", msg)
221 break
222 }
[103]223 if err := uc.handleMessage(msg); err != nil {
224 uc.logger.Printf("failed to handle message %q: %v", msg, err)
225 }
[166]226 case eventDownstreamConnected:
227 dc := e.dc
[168]228
229 if err := dc.welcome(); err != nil {
230 dc.logger.Printf("failed to handle new registered connection: %v", err)
231 break
232 }
233
[166]234 u.downstreamConns = append(u.downstreamConns, dc)
[198]235
236 u.forEachUpstream(func(uc *upstreamConn) {
237 uc.updateAway()
238 })
[167]239 case eventDownstreamDisconnected:
240 dc := e.dc
[204]241
242 for net, rc := range dc.ringConsumers {
243 seq := rc.Close()
244 net.history[dc.clientName] = seq
245 }
246
[167]247 for i := range u.downstreamConns {
248 if u.downstreamConns[i] == dc {
249 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
250 break
251 }
252 }
[198]253
254 u.forEachUpstream(func(uc *upstreamConn) {
255 uc.updateAway()
256 })
[165]257 case eventDownstreamMessage:
258 msg, dc := e.msg, e.dc
[133]259 if dc.isClosed() {
260 dc.logger.Printf("ignoring message on closed connection: %v", msg)
261 break
262 }
[103]263 err := dc.handleMessage(msg)
264 if ircErr, ok := err.(ircError); ok {
265 ircErr.Message.Prefix = dc.srv.prefix()
266 dc.SendMessage(ircErr.Message)
267 } else if err != nil {
268 dc.logger.Printf("failed to handle message %q: %v", msg, err)
269 dc.Close()
270 }
[165]271 default:
272 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]273 }
274 }
[101]275}
276
[120]277func (u *user) createNetwork(net *Network) (*network, error) {
[144]278 if net.ID != 0 {
279 panic("tried creating an already-existing network")
280 }
281
[120]282 network := newNetwork(u, net)
[101]283 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
284 if err != nil {
285 return nil, err
286 }
[144]287
288 u.forEachDownstream(func(dc *downstreamConn) {
289 if dc.network == nil {
290 dc.runNetwork(network, false)
291 }
292 })
293
[101]294 u.networks = append(u.networks, network)
[144]295
[101]296 go network.run()
297 return network, nil
298}
[202]299
300func (u *user) deleteNetwork(id int64) error {
301 for i, net := range u.networks {
302 if net.ID != id {
303 continue
304 }
305
306 if err := u.srv.db.DeleteNetwork(net.ID); err != nil {
307 return err
308 }
309
310 u.forEachDownstream(func(dc *downstreamConn) {
311 if dc.network != nil && dc.network == net {
312 dc.Close()
313 }
314 })
315
316 net.Stop()
[203]317 net.ring.Close()
[202]318 u.networks = append(u.networks[:i], u.networks[i+1:]...)
319 return nil
320 }
321
322 panic("tried deleting a non-existing network")
323}
Note: See TracBrowser for help on using the repository browser.