source: code/trunk/user.go@ 198

Last change on this file since 198 was 198, checked in by contact, 5 years ago

Auto away

Closes: https://todo.sr.ht/~emersion/soju/13

File size: 5.2 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[196]17type eventUpstreamConnected struct {
18 uc *upstreamConn
19}
20
[179]21type eventUpstreamDisconnected struct {
22 uc *upstreamConn
23}
24
[165]25type eventDownstreamMessage struct {
[103]26 msg *irc.Message
27 dc *downstreamConn
28}
29
[166]30type eventDownstreamConnected struct {
31 dc *downstreamConn
32}
33
[167]34type eventDownstreamDisconnected struct {
35 dc *downstreamConn
36}
37
[101]38type network struct {
39 Network
40 user *user
[143]41 ring *Ring
[131]42
43 lock sync.Mutex
44 conn *upstreamConn
45 history map[string]uint64
[101]46}
47
48func newNetwork(user *user, record *Network) *network {
49 return &network{
50 Network: *record,
51 user: user,
[143]52 ring: NewRing(user.srv.RingCap),
[131]53 history: make(map[string]uint64),
[101]54 }
55}
56
57func (net *network) run() {
58 var lastTry time.Time
59 for {
60 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
61 delay := retryConnectMinDelay - dur
62 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
63 time.Sleep(delay)
64 }
65 lastTry = time.Now()
66
67 uc, err := connectToUpstream(net)
68 if err != nil {
69 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
70 continue
71 }
72
73 uc.register()
[197]74 if err := uc.runUntilRegistered(); err != nil {
75 uc.logger.Printf("failed to register: %v", err)
76 uc.Close()
77 continue
78 }
[101]79
[131]80 net.lock.Lock()
[101]81 net.conn = uc
[131]82 net.lock.Unlock()
[101]83
[196]84 net.user.events <- eventUpstreamConnected{uc}
[165]85 if err := uc.readMessages(net.user.events); err != nil {
[101]86 uc.logger.Printf("failed to handle messages: %v", err)
87 }
88 uc.Close()
[179]89 net.user.events <- eventUpstreamDisconnected{uc}
[101]90
[131]91 net.lock.Lock()
[101]92 net.conn = nil
[131]93 net.lock.Unlock()
[101]94 }
95}
96
[136]97func (net *network) upstream() *upstreamConn {
98 net.lock.Lock()
99 defer net.lock.Unlock()
100 return net.conn
101}
102
[101]103type user struct {
104 User
105 srv *Server
106
[165]107 events chan event
[103]108
[101]109 networks []*network
110 downstreamConns []*downstreamConn
[177]111
112 // LIST commands in progress
[179]113 pendingLISTs []pendingLIST
[101]114}
115
[177]116type pendingLIST struct {
117 downstreamID uint64
118 // list of per-upstream LIST commands not yet sent or completed
119 pendingCommands map[int64]*irc.Message
120}
121
[101]122func newUser(srv *Server, record *User) *user {
123 return &user{
[165]124 User: *record,
125 srv: srv,
126 events: make(chan event, 64),
[101]127 }
128}
129
130func (u *user) forEachNetwork(f func(*network)) {
131 for _, network := range u.networks {
132 f(network)
133 }
134}
135
136func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
137 for _, network := range u.networks {
[136]138 uc := network.upstream()
[197]139 if uc == nil {
[101]140 continue
141 }
142 f(uc)
143 }
144}
145
146func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
147 for _, dc := range u.downstreamConns {
148 f(dc)
149 }
150}
151
152func (u *user) getNetwork(name string) *network {
153 for _, network := range u.networks {
154 if network.Addr == name {
155 return network
156 }
157 }
158 return nil
159}
160
161func (u *user) run() {
162 networks, err := u.srv.db.ListNetworks(u.Username)
163 if err != nil {
164 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
165 return
166 }
167
168 for _, record := range networks {
169 network := newNetwork(u, &record)
170 u.networks = append(u.networks, network)
171
172 go network.run()
173 }
[103]174
[165]175 for e := range u.events {
176 switch e := e.(type) {
[196]177 case eventUpstreamConnected:
[198]178 uc := e.uc
179 uc.updateAway()
[179]180 case eventUpstreamDisconnected:
181 uc := e.uc
182 for _, log := range uc.logs {
183 log.file.Close()
184 }
[181]185 uc.endPendingLISTs(true)
[165]186 case eventUpstreamMessage:
187 msg, uc := e.msg, e.uc
[175]188 if uc.isClosed() {
[133]189 uc.logger.Printf("ignoring message on closed connection: %v", msg)
190 break
191 }
[103]192 if err := uc.handleMessage(msg); err != nil {
193 uc.logger.Printf("failed to handle message %q: %v", msg, err)
194 }
[166]195 case eventDownstreamConnected:
196 dc := e.dc
[168]197
198 if err := dc.welcome(); err != nil {
199 dc.logger.Printf("failed to handle new registered connection: %v", err)
200 break
201 }
202
[166]203 u.downstreamConns = append(u.downstreamConns, dc)
[198]204
205 u.forEachUpstream(func(uc *upstreamConn) {
206 uc.updateAway()
207 })
[167]208 case eventDownstreamDisconnected:
209 dc := e.dc
210 for i := range u.downstreamConns {
211 if u.downstreamConns[i] == dc {
212 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
213 break
214 }
215 }
[198]216
217 u.forEachUpstream(func(uc *upstreamConn) {
218 uc.updateAway()
219 })
[165]220 case eventDownstreamMessage:
221 msg, dc := e.msg, e.dc
[133]222 if dc.isClosed() {
223 dc.logger.Printf("ignoring message on closed connection: %v", msg)
224 break
225 }
[103]226 err := dc.handleMessage(msg)
227 if ircErr, ok := err.(ircError); ok {
228 ircErr.Message.Prefix = dc.srv.prefix()
229 dc.SendMessage(ircErr.Message)
230 } else if err != nil {
231 dc.logger.Printf("failed to handle message %q: %v", msg, err)
232 dc.Close()
233 }
[165]234 default:
235 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]236 }
237 }
[101]238}
239
[120]240func (u *user) createNetwork(net *Network) (*network, error) {
[144]241 if net.ID != 0 {
242 panic("tried creating an already-existing network")
243 }
244
[120]245 network := newNetwork(u, net)
[101]246 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
247 if err != nil {
248 return nil, err
249 }
[144]250
251 u.forEachDownstream(func(dc *downstreamConn) {
252 if dc.network == nil {
253 dc.runNetwork(network, false)
254 }
255 })
256
[101]257 u.networks = append(u.networks, network)
[144]258
[101]259 go network.run()
260 return network, nil
261}
Note: See TracBrowser for help on using the repository browser.