source: code/trunk/user.go@ 196

Last change on this file since 196 was 196, checked in by contact, 5 years ago

Add eventUpstreamConnected

This is used in the next commit.

File size: 5.1 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[196]17type eventUpstreamConnected struct {
18 uc *upstreamConn
19}
20
[179]21type eventUpstreamDisconnected struct {
22 uc *upstreamConn
23}
24
[165]25type eventDownstreamMessage struct {
[103]26 msg *irc.Message
27 dc *downstreamConn
28}
29
[166]30type eventDownstreamConnected struct {
31 dc *downstreamConn
32}
33
[167]34type eventDownstreamDisconnected struct {
35 dc *downstreamConn
36}
37
[101]38type network struct {
39 Network
40 user *user
[143]41 ring *Ring
[131]42
43 lock sync.Mutex
44 conn *upstreamConn
45 history map[string]uint64
[101]46}
47
48func newNetwork(user *user, record *Network) *network {
49 return &network{
50 Network: *record,
51 user: user,
[143]52 ring: NewRing(user.srv.RingCap),
[131]53 history: make(map[string]uint64),
[101]54 }
55}
56
57func (net *network) run() {
58 var lastTry time.Time
59 for {
60 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
61 delay := retryConnectMinDelay - dur
62 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
63 time.Sleep(delay)
64 }
65 lastTry = time.Now()
66
67 uc, err := connectToUpstream(net)
68 if err != nil {
69 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
70 continue
71 }
72
73 uc.register()
74
[175]75 // TODO: wait for the connection to be registered before adding it to
76 // net, otherwise messages might be sent to it while still being
77 // unauthenticated
[131]78 net.lock.Lock()
[101]79 net.conn = uc
[131]80 net.lock.Unlock()
[101]81
[196]82 net.user.events <- eventUpstreamConnected{uc}
[165]83 if err := uc.readMessages(net.user.events); err != nil {
[101]84 uc.logger.Printf("failed to handle messages: %v", err)
85 }
86 uc.Close()
[179]87 net.user.events <- eventUpstreamDisconnected{uc}
[101]88
[131]89 net.lock.Lock()
[101]90 net.conn = nil
[131]91 net.lock.Unlock()
[101]92 }
93}
94
[136]95func (net *network) upstream() *upstreamConn {
96 net.lock.Lock()
97 defer net.lock.Unlock()
98 return net.conn
99}
100
[101]101type user struct {
102 User
103 srv *Server
104
[165]105 events chan event
[103]106
[101]107 networks []*network
108 downstreamConns []*downstreamConn
[177]109
110 // LIST commands in progress
[179]111 pendingLISTs []pendingLIST
[101]112}
113
[177]114type pendingLIST struct {
115 downstreamID uint64
116 // list of per-upstream LIST commands not yet sent or completed
117 pendingCommands map[int64]*irc.Message
118}
119
[101]120func newUser(srv *Server, record *User) *user {
121 return &user{
[165]122 User: *record,
123 srv: srv,
124 events: make(chan event, 64),
[101]125 }
126}
127
128func (u *user) forEachNetwork(f func(*network)) {
129 for _, network := range u.networks {
130 f(network)
131 }
132}
133
134func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
135 for _, network := range u.networks {
[136]136 uc := network.upstream()
[175]137 if uc == nil || !uc.registered {
[101]138 continue
139 }
140 f(uc)
141 }
142}
143
144func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
145 for _, dc := range u.downstreamConns {
146 f(dc)
147 }
148}
149
150func (u *user) getNetwork(name string) *network {
151 for _, network := range u.networks {
152 if network.Addr == name {
153 return network
154 }
155 }
156 return nil
157}
158
159func (u *user) run() {
160 networks, err := u.srv.db.ListNetworks(u.Username)
161 if err != nil {
162 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
163 return
164 }
165
166 for _, record := range networks {
167 network := newNetwork(u, &record)
168 u.networks = append(u.networks, network)
169
170 go network.run()
171 }
[103]172
[165]173 for e := range u.events {
174 switch e := e.(type) {
[196]175 case eventUpstreamConnected:
176 // Nothing to do
[179]177 case eventUpstreamDisconnected:
178 uc := e.uc
179 for _, log := range uc.logs {
180 log.file.Close()
181 }
[181]182 uc.endPendingLISTs(true)
[165]183 case eventUpstreamMessage:
184 msg, uc := e.msg, e.uc
[175]185 if uc.isClosed() {
[133]186 uc.logger.Printf("ignoring message on closed connection: %v", msg)
187 break
188 }
[103]189 if err := uc.handleMessage(msg); err != nil {
190 uc.logger.Printf("failed to handle message %q: %v", msg, err)
191 }
[166]192 case eventDownstreamConnected:
193 dc := e.dc
[168]194
195 if err := dc.welcome(); err != nil {
196 dc.logger.Printf("failed to handle new registered connection: %v", err)
197 break
198 }
199
[166]200 u.downstreamConns = append(u.downstreamConns, dc)
[167]201 case eventDownstreamDisconnected:
202 dc := e.dc
203 for i := range u.downstreamConns {
204 if u.downstreamConns[i] == dc {
205 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
206 break
207 }
208 }
[165]209 case eventDownstreamMessage:
210 msg, dc := e.msg, e.dc
[133]211 if dc.isClosed() {
212 dc.logger.Printf("ignoring message on closed connection: %v", msg)
213 break
214 }
[103]215 err := dc.handleMessage(msg)
216 if ircErr, ok := err.(ircError); ok {
217 ircErr.Message.Prefix = dc.srv.prefix()
218 dc.SendMessage(ircErr.Message)
219 } else if err != nil {
220 dc.logger.Printf("failed to handle message %q: %v", msg, err)
221 dc.Close()
222 }
[165]223 default:
224 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]225 }
226 }
[101]227}
228
[120]229func (u *user) createNetwork(net *Network) (*network, error) {
[144]230 if net.ID != 0 {
231 panic("tried creating an already-existing network")
232 }
233
[120]234 network := newNetwork(u, net)
[101]235 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
236 if err != nil {
237 return nil, err
238 }
[144]239
240 u.forEachDownstream(func(dc *downstreamConn) {
241 if dc.network == nil {
242 dc.runNetwork(network, false)
243 }
244 })
245
[101]246 u.networks = append(u.networks, network)
[144]247
[101]248 go network.run()
249 return network, nil
250}
Note: See TracBrowser for help on using the repository browser.