source: code/trunk/user.go@ 179

Last change on this file since 179 was 179, checked in by contact, 5 years ago

Introduce eventUpstreamDisconnected

This allows us to perform cleanup actions in the user goroutine. This
removes the need for pendingLISTsLock.

File size: 5.0 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[179]17type eventUpstreamDisconnected struct {
18 uc *upstreamConn
19}
20
[165]21type eventDownstreamMessage struct {
[103]22 msg *irc.Message
23 dc *downstreamConn
24}
25
[166]26type eventDownstreamConnected struct {
27 dc *downstreamConn
28}
29
[167]30type eventDownstreamDisconnected struct {
31 dc *downstreamConn
32}
33
[101]34type network struct {
35 Network
36 user *user
[143]37 ring *Ring
[131]38
39 lock sync.Mutex
40 conn *upstreamConn
41 history map[string]uint64
[101]42}
43
44func newNetwork(user *user, record *Network) *network {
45 return &network{
46 Network: *record,
47 user: user,
[143]48 ring: NewRing(user.srv.RingCap),
[131]49 history: make(map[string]uint64),
[101]50 }
51}
52
53func (net *network) run() {
54 var lastTry time.Time
55 for {
56 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
57 delay := retryConnectMinDelay - dur
58 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
59 time.Sleep(delay)
60 }
61 lastTry = time.Now()
62
63 uc, err := connectToUpstream(net)
64 if err != nil {
65 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
66 continue
67 }
68
69 uc.register()
70
[175]71 // TODO: wait for the connection to be registered before adding it to
72 // net, otherwise messages might be sent to it while still being
73 // unauthenticated
[131]74 net.lock.Lock()
[101]75 net.conn = uc
[131]76 net.lock.Unlock()
[101]77
[165]78 if err := uc.readMessages(net.user.events); err != nil {
[101]79 uc.logger.Printf("failed to handle messages: %v", err)
80 }
81 uc.Close()
[179]82 net.user.events <- eventUpstreamDisconnected{uc}
[101]83
[131]84 net.lock.Lock()
[101]85 net.conn = nil
[131]86 net.lock.Unlock()
[101]87 }
88}
89
[136]90func (net *network) upstream() *upstreamConn {
91 net.lock.Lock()
92 defer net.lock.Unlock()
93 return net.conn
94}
95
[101]96type user struct {
97 User
98 srv *Server
99
[165]100 events chan event
[103]101
[101]102 networks []*network
103 downstreamConns []*downstreamConn
[177]104
105 // LIST commands in progress
[179]106 pendingLISTs []pendingLIST
[101]107}
108
[177]109type pendingLIST struct {
110 downstreamID uint64
111 // list of per-upstream LIST commands not yet sent or completed
112 pendingCommands map[int64]*irc.Message
113}
114
[101]115func newUser(srv *Server, record *User) *user {
116 return &user{
[165]117 User: *record,
118 srv: srv,
119 events: make(chan event, 64),
[101]120 }
121}
122
123func (u *user) forEachNetwork(f func(*network)) {
124 for _, network := range u.networks {
125 f(network)
126 }
127}
128
129func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
130 for _, network := range u.networks {
[136]131 uc := network.upstream()
[175]132 if uc == nil || !uc.registered {
[101]133 continue
134 }
135 f(uc)
136 }
137}
138
139func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
140 for _, dc := range u.downstreamConns {
141 f(dc)
142 }
143}
144
145func (u *user) getNetwork(name string) *network {
146 for _, network := range u.networks {
147 if network.Addr == name {
148 return network
149 }
150 }
151 return nil
152}
153
154func (u *user) run() {
155 networks, err := u.srv.db.ListNetworks(u.Username)
156 if err != nil {
157 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
158 return
159 }
160
161 for _, record := range networks {
162 network := newNetwork(u, &record)
163 u.networks = append(u.networks, network)
164
165 go network.run()
166 }
[103]167
[165]168 for e := range u.events {
169 switch e := e.(type) {
[179]170 case eventUpstreamDisconnected:
171 uc := e.uc
172 for _, log := range uc.logs {
173 log.file.Close()
174 }
175 uc.endPendingLists(true)
[165]176 case eventUpstreamMessage:
177 msg, uc := e.msg, e.uc
[175]178 if uc.isClosed() {
[133]179 uc.logger.Printf("ignoring message on closed connection: %v", msg)
180 break
181 }
[103]182 if err := uc.handleMessage(msg); err != nil {
183 uc.logger.Printf("failed to handle message %q: %v", msg, err)
184 }
[166]185 case eventDownstreamConnected:
186 dc := e.dc
[168]187
188 if err := dc.welcome(); err != nil {
189 dc.logger.Printf("failed to handle new registered connection: %v", err)
190 break
191 }
192
[166]193 u.downstreamConns = append(u.downstreamConns, dc)
[167]194 case eventDownstreamDisconnected:
195 dc := e.dc
196 for i := range u.downstreamConns {
197 if u.downstreamConns[i] == dc {
198 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
199 break
200 }
201 }
[165]202 case eventDownstreamMessage:
203 msg, dc := e.msg, e.dc
[133]204 if dc.isClosed() {
205 dc.logger.Printf("ignoring message on closed connection: %v", msg)
206 break
207 }
[103]208 err := dc.handleMessage(msg)
209 if ircErr, ok := err.(ircError); ok {
210 ircErr.Message.Prefix = dc.srv.prefix()
211 dc.SendMessage(ircErr.Message)
212 } else if err != nil {
213 dc.logger.Printf("failed to handle message %q: %v", msg, err)
214 dc.Close()
215 }
[165]216 default:
217 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]218 }
219 }
[101]220}
221
[120]222func (u *user) createNetwork(net *Network) (*network, error) {
[144]223 if net.ID != 0 {
224 panic("tried creating an already-existing network")
225 }
226
[120]227 network := newNetwork(u, net)
[101]228 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
229 if err != nil {
230 return nil, err
231 }
[144]232
233 u.forEachDownstream(func(dc *downstreamConn) {
234 if dc.network == nil {
235 dc.runNetwork(network, false)
236 }
237 })
238
[101]239 u.networks = append(u.networks, network)
[144]240
[101]241 go network.run()
242 return network, nil
243}
Note: See TracBrowser for help on using the repository browser.