source: code/trunk/user.go@ 196

Last change on this file since 196 was 196, checked in by contact, 5 years ago

Add eventUpstreamConnected

This is used in the next commit.

File size: 5.1 KB
Line 
1package soju
2
3import (
4 "sync"
5 "time"
6
7 "gopkg.in/irc.v3"
8)
9
10type event interface{}
11
12type eventUpstreamMessage struct {
13 msg *irc.Message
14 uc *upstreamConn
15}
16
17type eventUpstreamConnected struct {
18 uc *upstreamConn
19}
20
21type eventUpstreamDisconnected struct {
22 uc *upstreamConn
23}
24
25type eventDownstreamMessage struct {
26 msg *irc.Message
27 dc *downstreamConn
28}
29
30type eventDownstreamConnected struct {
31 dc *downstreamConn
32}
33
34type eventDownstreamDisconnected struct {
35 dc *downstreamConn
36}
37
38type network struct {
39 Network
40 user *user
41 ring *Ring
42
43 lock sync.Mutex
44 conn *upstreamConn
45 history map[string]uint64
46}
47
48func newNetwork(user *user, record *Network) *network {
49 return &network{
50 Network: *record,
51 user: user,
52 ring: NewRing(user.srv.RingCap),
53 history: make(map[string]uint64),
54 }
55}
56
57func (net *network) run() {
58 var lastTry time.Time
59 for {
60 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
61 delay := retryConnectMinDelay - dur
62 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
63 time.Sleep(delay)
64 }
65 lastTry = time.Now()
66
67 uc, err := connectToUpstream(net)
68 if err != nil {
69 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
70 continue
71 }
72
73 uc.register()
74
75 // TODO: wait for the connection to be registered before adding it to
76 // net, otherwise messages might be sent to it while still being
77 // unauthenticated
78 net.lock.Lock()
79 net.conn = uc
80 net.lock.Unlock()
81
82 net.user.events <- eventUpstreamConnected{uc}
83 if err := uc.readMessages(net.user.events); err != nil {
84 uc.logger.Printf("failed to handle messages: %v", err)
85 }
86 uc.Close()
87 net.user.events <- eventUpstreamDisconnected{uc}
88
89 net.lock.Lock()
90 net.conn = nil
91 net.lock.Unlock()
92 }
93}
94
95func (net *network) upstream() *upstreamConn {
96 net.lock.Lock()
97 defer net.lock.Unlock()
98 return net.conn
99}
100
101type user struct {
102 User
103 srv *Server
104
105 events chan event
106
107 networks []*network
108 downstreamConns []*downstreamConn
109
110 // LIST commands in progress
111 pendingLISTs []pendingLIST
112}
113
114type pendingLIST struct {
115 downstreamID uint64
116 // list of per-upstream LIST commands not yet sent or completed
117 pendingCommands map[int64]*irc.Message
118}
119
120func newUser(srv *Server, record *User) *user {
121 return &user{
122 User: *record,
123 srv: srv,
124 events: make(chan event, 64),
125 }
126}
127
128func (u *user) forEachNetwork(f func(*network)) {
129 for _, network := range u.networks {
130 f(network)
131 }
132}
133
134func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
135 for _, network := range u.networks {
136 uc := network.upstream()
137 if uc == nil || !uc.registered {
138 continue
139 }
140 f(uc)
141 }
142}
143
144func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
145 for _, dc := range u.downstreamConns {
146 f(dc)
147 }
148}
149
150func (u *user) getNetwork(name string) *network {
151 for _, network := range u.networks {
152 if network.Addr == name {
153 return network
154 }
155 }
156 return nil
157}
158
159func (u *user) run() {
160 networks, err := u.srv.db.ListNetworks(u.Username)
161 if err != nil {
162 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
163 return
164 }
165
166 for _, record := range networks {
167 network := newNetwork(u, &record)
168 u.networks = append(u.networks, network)
169
170 go network.run()
171 }
172
173 for e := range u.events {
174 switch e := e.(type) {
175 case eventUpstreamConnected:
176 // Nothing to do
177 case eventUpstreamDisconnected:
178 uc := e.uc
179 for _, log := range uc.logs {
180 log.file.Close()
181 }
182 uc.endPendingLISTs(true)
183 case eventUpstreamMessage:
184 msg, uc := e.msg, e.uc
185 if uc.isClosed() {
186 uc.logger.Printf("ignoring message on closed connection: %v", msg)
187 break
188 }
189 if err := uc.handleMessage(msg); err != nil {
190 uc.logger.Printf("failed to handle message %q: %v", msg, err)
191 }
192 case eventDownstreamConnected:
193 dc := e.dc
194
195 if err := dc.welcome(); err != nil {
196 dc.logger.Printf("failed to handle new registered connection: %v", err)
197 break
198 }
199
200 u.downstreamConns = append(u.downstreamConns, dc)
201 case eventDownstreamDisconnected:
202 dc := e.dc
203 for i := range u.downstreamConns {
204 if u.downstreamConns[i] == dc {
205 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
206 break
207 }
208 }
209 case eventDownstreamMessage:
210 msg, dc := e.msg, e.dc
211 if dc.isClosed() {
212 dc.logger.Printf("ignoring message on closed connection: %v", msg)
213 break
214 }
215 err := dc.handleMessage(msg)
216 if ircErr, ok := err.(ircError); ok {
217 ircErr.Message.Prefix = dc.srv.prefix()
218 dc.SendMessage(ircErr.Message)
219 } else if err != nil {
220 dc.logger.Printf("failed to handle message %q: %v", msg, err)
221 dc.Close()
222 }
223 default:
224 u.srv.Logger.Printf("received unknown event type: %T", e)
225 }
226 }
227}
228
229func (u *user) createNetwork(net *Network) (*network, error) {
230 if net.ID != 0 {
231 panic("tried creating an already-existing network")
232 }
233
234 network := newNetwork(u, net)
235 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
236 if err != nil {
237 return nil, err
238 }
239
240 u.forEachDownstream(func(dc *downstreamConn) {
241 if dc.network == nil {
242 dc.runNetwork(network, false)
243 }
244 })
245
246 u.networks = append(u.networks, network)
247
248 go network.run()
249 return network, nil
250}
Note: See TracBrowser for help on using the repository browser.