source: code/trunk/user.go@ 167

Last change on this file since 167 was 167, checked in by contact, 5 years ago

Add eventDownstreamDisconnected

This should remove the need for protecting user.downstreamConns with a
mutex.

File size: 4.5 KB
Line 
1package soju
2
3import (
4 "sync"
5 "time"
6
7 "gopkg.in/irc.v3"
8)
9
10type event interface{}
11
12type eventUpstreamMessage struct {
13 msg *irc.Message
14 uc *upstreamConn
15}
16
17type eventDownstreamMessage struct {
18 msg *irc.Message
19 dc *downstreamConn
20}
21
22type eventDownstreamConnected struct {
23 dc *downstreamConn
24}
25
26type eventDownstreamDisconnected struct {
27 dc *downstreamConn
28}
29
30type network struct {
31 Network
32 user *user
33 ring *Ring
34
35 lock sync.Mutex
36 conn *upstreamConn
37 history map[string]uint64
38}
39
40func newNetwork(user *user, record *Network) *network {
41 return &network{
42 Network: *record,
43 user: user,
44 ring: NewRing(user.srv.RingCap),
45 history: make(map[string]uint64),
46 }
47}
48
49func (net *network) run() {
50 var lastTry time.Time
51 for {
52 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
53 delay := retryConnectMinDelay - dur
54 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
55 time.Sleep(delay)
56 }
57 lastTry = time.Now()
58
59 uc, err := connectToUpstream(net)
60 if err != nil {
61 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
62 continue
63 }
64
65 uc.register()
66
67 net.lock.Lock()
68 net.conn = uc
69 net.lock.Unlock()
70
71 if err := uc.readMessages(net.user.events); err != nil {
72 uc.logger.Printf("failed to handle messages: %v", err)
73 }
74 uc.Close()
75
76 net.lock.Lock()
77 net.conn = nil
78 net.lock.Unlock()
79 }
80}
81
82func (net *network) upstream() *upstreamConn {
83 net.lock.Lock()
84 defer net.lock.Unlock()
85 return net.conn
86}
87
88type user struct {
89 User
90 srv *Server
91
92 events chan event
93
94 lock sync.Mutex
95 networks []*network
96 downstreamConns []*downstreamConn
97}
98
99func newUser(srv *Server, record *User) *user {
100 return &user{
101 User: *record,
102 srv: srv,
103 events: make(chan event, 64),
104 }
105}
106
107func (u *user) forEachNetwork(f func(*network)) {
108 u.lock.Lock()
109 for _, network := range u.networks {
110 f(network)
111 }
112 u.lock.Unlock()
113}
114
115func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
116 u.lock.Lock()
117 for _, network := range u.networks {
118 uc := network.upstream()
119 if uc == nil || !uc.registered || uc.closed {
120 continue
121 }
122 f(uc)
123 }
124 u.lock.Unlock()
125}
126
127func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
128 u.lock.Lock()
129 for _, dc := range u.downstreamConns {
130 f(dc)
131 }
132 u.lock.Unlock()
133}
134
135func (u *user) getNetwork(name string) *network {
136 for _, network := range u.networks {
137 if network.Addr == name {
138 return network
139 }
140 }
141 return nil
142}
143
144func (u *user) run() {
145 networks, err := u.srv.db.ListNetworks(u.Username)
146 if err != nil {
147 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
148 return
149 }
150
151 u.lock.Lock()
152 for _, record := range networks {
153 network := newNetwork(u, &record)
154 u.networks = append(u.networks, network)
155
156 go network.run()
157 }
158 u.lock.Unlock()
159
160 for e := range u.events {
161 switch e := e.(type) {
162 case eventUpstreamMessage:
163 msg, uc := e.msg, e.uc
164 if uc.closed {
165 uc.logger.Printf("ignoring message on closed connection: %v", msg)
166 break
167 }
168 if err := uc.handleMessage(msg); err != nil {
169 uc.logger.Printf("failed to handle message %q: %v", msg, err)
170 }
171 case eventDownstreamConnected:
172 dc := e.dc
173 u.lock.Lock()
174 u.downstreamConns = append(u.downstreamConns, dc)
175 u.lock.Unlock()
176 case eventDownstreamDisconnected:
177 dc := e.dc
178 u.lock.Lock()
179 for i := range u.downstreamConns {
180 if u.downstreamConns[i] == dc {
181 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
182 break
183 }
184 }
185 u.lock.Unlock()
186 case eventDownstreamMessage:
187 msg, dc := e.msg, e.dc
188 if dc.isClosed() {
189 dc.logger.Printf("ignoring message on closed connection: %v", msg)
190 break
191 }
192 err := dc.handleMessage(msg)
193 if ircErr, ok := err.(ircError); ok {
194 ircErr.Message.Prefix = dc.srv.prefix()
195 dc.SendMessage(ircErr.Message)
196 } else if err != nil {
197 dc.logger.Printf("failed to handle message %q: %v", msg, err)
198 dc.Close()
199 }
200 default:
201 u.srv.Logger.Printf("received unknown event type: %T", e)
202 }
203 }
204}
205
206func (u *user) createNetwork(net *Network) (*network, error) {
207 if net.ID != 0 {
208 panic("tried creating an already-existing network")
209 }
210
211 network := newNetwork(u, net)
212 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
213 if err != nil {
214 return nil, err
215 }
216
217 u.forEachDownstream(func(dc *downstreamConn) {
218 if dc.network == nil {
219 dc.runNetwork(network, false)
220 }
221 })
222
223 u.lock.Lock()
224 u.networks = append(u.networks, network)
225 u.lock.Unlock()
226
227 go network.run()
228 return network, nil
229}
Note: See TracBrowser for help on using the repository browser.