source: code/trunk/user.go@ 201

Last change on this file since 201 was 201, checked in by contact, 5 years ago

Make user.getNetwork handle Network.Name

File size: 5.4 KB
Line 
1package soju
2
3import (
4 "sync"
5 "time"
6
7 "gopkg.in/irc.v3"
8)
9
10type event interface{}
11
12type eventUpstreamMessage struct {
13 msg *irc.Message
14 uc *upstreamConn
15}
16
17type eventUpstreamConnected struct {
18 uc *upstreamConn
19}
20
21type eventUpstreamDisconnected struct {
22 uc *upstreamConn
23}
24
25type eventDownstreamMessage struct {
26 msg *irc.Message
27 dc *downstreamConn
28}
29
30type eventDownstreamConnected struct {
31 dc *downstreamConn
32}
33
34type eventDownstreamDisconnected struct {
35 dc *downstreamConn
36}
37
38type network struct {
39 Network
40 user *user
41 ring *Ring
42
43 lock sync.Mutex
44 conn *upstreamConn
45 history map[string]uint64
46}
47
48func newNetwork(user *user, record *Network) *network {
49 return &network{
50 Network: *record,
51 user: user,
52 ring: NewRing(user.srv.RingCap),
53 history: make(map[string]uint64),
54 }
55}
56
57func (net *network) run() {
58 var lastTry time.Time
59 for {
60 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
61 delay := retryConnectMinDelay - dur
62 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
63 time.Sleep(delay)
64 }
65 lastTry = time.Now()
66
67 uc, err := connectToUpstream(net)
68 if err != nil {
69 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
70 continue
71 }
72
73 uc.register()
74 if err := uc.runUntilRegistered(); err != nil {
75 uc.logger.Printf("failed to register: %v", err)
76 uc.Close()
77 continue
78 }
79
80 net.user.events <- eventUpstreamConnected{uc}
81 if err := uc.readMessages(net.user.events); err != nil {
82 uc.logger.Printf("failed to handle messages: %v", err)
83 }
84 uc.Close()
85 net.user.events <- eventUpstreamDisconnected{uc}
86 }
87}
88
89func (net *network) upstream() *upstreamConn {
90 net.lock.Lock()
91 defer net.lock.Unlock()
92 return net.conn
93}
94
95type user struct {
96 User
97 srv *Server
98
99 events chan event
100
101 networks []*network
102 downstreamConns []*downstreamConn
103
104 // LIST commands in progress
105 pendingLISTs []pendingLIST
106}
107
108type pendingLIST struct {
109 downstreamID uint64
110 // list of per-upstream LIST commands not yet sent or completed
111 pendingCommands map[int64]*irc.Message
112}
113
114func newUser(srv *Server, record *User) *user {
115 return &user{
116 User: *record,
117 srv: srv,
118 events: make(chan event, 64),
119 }
120}
121
122func (u *user) forEachNetwork(f func(*network)) {
123 for _, network := range u.networks {
124 f(network)
125 }
126}
127
128func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
129 for _, network := range u.networks {
130 uc := network.upstream()
131 if uc == nil {
132 continue
133 }
134 f(uc)
135 }
136}
137
138func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
139 for _, dc := range u.downstreamConns {
140 f(dc)
141 }
142}
143
144func (u *user) getNetwork(name string) *network {
145 for _, network := range u.networks {
146 if network.Addr == name {
147 return network
148 }
149 if network.Name != "" && network.Name == name {
150 return network
151 }
152 }
153 return nil
154}
155
156func (u *user) run() {
157 networks, err := u.srv.db.ListNetworks(u.Username)
158 if err != nil {
159 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
160 return
161 }
162
163 for _, record := range networks {
164 network := newNetwork(u, &record)
165 u.networks = append(u.networks, network)
166
167 go network.run()
168 }
169
170 for e := range u.events {
171 switch e := e.(type) {
172 case eventUpstreamConnected:
173 uc := e.uc
174
175 uc.network.lock.Lock()
176 uc.network.conn = uc
177 uc.network.lock.Unlock()
178
179 uc.updateAway()
180 case eventUpstreamDisconnected:
181 uc := e.uc
182
183 uc.network.lock.Lock()
184 uc.network.conn = nil
185 uc.network.lock.Unlock()
186
187 for _, log := range uc.logs {
188 log.file.Close()
189 }
190
191 uc.endPendingLISTs(true)
192 case eventUpstreamMessage:
193 msg, uc := e.msg, e.uc
194 if uc.isClosed() {
195 uc.logger.Printf("ignoring message on closed connection: %v", msg)
196 break
197 }
198 if err := uc.handleMessage(msg); err != nil {
199 uc.logger.Printf("failed to handle message %q: %v", msg, err)
200 }
201 case eventDownstreamConnected:
202 dc := e.dc
203
204 if err := dc.welcome(); err != nil {
205 dc.logger.Printf("failed to handle new registered connection: %v", err)
206 break
207 }
208
209 u.downstreamConns = append(u.downstreamConns, dc)
210
211 u.forEachUpstream(func(uc *upstreamConn) {
212 uc.updateAway()
213 })
214 case eventDownstreamDisconnected:
215 dc := e.dc
216 for i := range u.downstreamConns {
217 if u.downstreamConns[i] == dc {
218 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
219 break
220 }
221 }
222
223 u.forEachUpstream(func(uc *upstreamConn) {
224 uc.updateAway()
225 })
226 case eventDownstreamMessage:
227 msg, dc := e.msg, e.dc
228 if dc.isClosed() {
229 dc.logger.Printf("ignoring message on closed connection: %v", msg)
230 break
231 }
232 err := dc.handleMessage(msg)
233 if ircErr, ok := err.(ircError); ok {
234 ircErr.Message.Prefix = dc.srv.prefix()
235 dc.SendMessage(ircErr.Message)
236 } else if err != nil {
237 dc.logger.Printf("failed to handle message %q: %v", msg, err)
238 dc.Close()
239 }
240 default:
241 u.srv.Logger.Printf("received unknown event type: %T", e)
242 }
243 }
244}
245
246func (u *user) createNetwork(net *Network) (*network, error) {
247 if net.ID != 0 {
248 panic("tried creating an already-existing network")
249 }
250
251 network := newNetwork(u, net)
252 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
253 if err != nil {
254 return nil, err
255 }
256
257 u.forEachDownstream(func(dc *downstreamConn) {
258 if dc.network == nil {
259 dc.runNetwork(network, false)
260 }
261 })
262
263 u.networks = append(u.networks, network)
264
265 go network.run()
266 return network, nil
267}
Note: See TracBrowser for help on using the repository browser.