source: code/trunk/user.go@ 165

Last change on this file since 165 was 165, checked in by contact, 5 years ago

Introduce a user.events channel

This allows to easily add new events, and also guarantees ordering
between different event types.

File size: 4.4 KB
Line 
1package soju
2
3import (
4 "sync"
5 "time"
6
7 "gopkg.in/irc.v3"
8)
9
10type event interface{}
11
12type eventUpstreamMessage struct {
13 msg *irc.Message
14 uc *upstreamConn
15}
16
17type eventDownstreamMessage struct {
18 msg *irc.Message
19 dc *downstreamConn
20}
21
22type network struct {
23 Network
24 user *user
25 ring *Ring
26
27 lock sync.Mutex
28 conn *upstreamConn
29 history map[string]uint64
30}
31
32func newNetwork(user *user, record *Network) *network {
33 return &network{
34 Network: *record,
35 user: user,
36 ring: NewRing(user.srv.RingCap),
37 history: make(map[string]uint64),
38 }
39}
40
41func (net *network) run() {
42 var lastTry time.Time
43 for {
44 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
45 delay := retryConnectMinDelay - dur
46 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
47 time.Sleep(delay)
48 }
49 lastTry = time.Now()
50
51 uc, err := connectToUpstream(net)
52 if err != nil {
53 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
54 continue
55 }
56
57 uc.register()
58
59 net.lock.Lock()
60 net.conn = uc
61 net.lock.Unlock()
62
63 if err := uc.readMessages(net.user.events); err != nil {
64 uc.logger.Printf("failed to handle messages: %v", err)
65 }
66 uc.Close()
67
68 net.lock.Lock()
69 net.conn = nil
70 net.lock.Unlock()
71 }
72}
73
74func (net *network) upstream() *upstreamConn {
75 net.lock.Lock()
76 defer net.lock.Unlock()
77 return net.conn
78}
79
80type user struct {
81 User
82 srv *Server
83
84 events chan event
85
86 lock sync.Mutex
87 networks []*network
88 downstreamConns []*downstreamConn
89}
90
91func newUser(srv *Server, record *User) *user {
92 return &user{
93 User: *record,
94 srv: srv,
95 events: make(chan event, 64),
96 }
97}
98
99func (u *user) forEachNetwork(f func(*network)) {
100 u.lock.Lock()
101 for _, network := range u.networks {
102 f(network)
103 }
104 u.lock.Unlock()
105}
106
107func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
108 u.lock.Lock()
109 for _, network := range u.networks {
110 uc := network.upstream()
111 if uc == nil || !uc.registered || uc.closed {
112 continue
113 }
114 f(uc)
115 }
116 u.lock.Unlock()
117}
118
119func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
120 u.lock.Lock()
121 for _, dc := range u.downstreamConns {
122 f(dc)
123 }
124 u.lock.Unlock()
125}
126
127func (u *user) getNetwork(name string) *network {
128 for _, network := range u.networks {
129 if network.Addr == name {
130 return network
131 }
132 }
133 return nil
134}
135
136func (u *user) run() {
137 networks, err := u.srv.db.ListNetworks(u.Username)
138 if err != nil {
139 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
140 return
141 }
142
143 u.lock.Lock()
144 for _, record := range networks {
145 network := newNetwork(u, &record)
146 u.networks = append(u.networks, network)
147
148 go network.run()
149 }
150 u.lock.Unlock()
151
152 for e := range u.events {
153 switch e := e.(type) {
154 case eventUpstreamMessage:
155 msg, uc := e.msg, e.uc
156 if uc.closed {
157 uc.logger.Printf("ignoring message on closed connection: %v", msg)
158 break
159 }
160 if err := uc.handleMessage(msg); err != nil {
161 uc.logger.Printf("failed to handle message %q: %v", msg, err)
162 }
163 case eventDownstreamMessage:
164 msg, dc := e.msg, e.dc
165 if dc.isClosed() {
166 dc.logger.Printf("ignoring message on closed connection: %v", msg)
167 break
168 }
169 err := dc.handleMessage(msg)
170 if ircErr, ok := err.(ircError); ok {
171 ircErr.Message.Prefix = dc.srv.prefix()
172 dc.SendMessage(ircErr.Message)
173 } else if err != nil {
174 dc.logger.Printf("failed to handle message %q: %v", msg, err)
175 dc.Close()
176 }
177 default:
178 u.srv.Logger.Printf("received unknown event type: %T", e)
179 }
180 }
181}
182
183func (u *user) addDownstream(dc *downstreamConn) (first bool) {
184 u.lock.Lock()
185 first = len(dc.user.downstreamConns) == 0
186 u.downstreamConns = append(u.downstreamConns, dc)
187 u.lock.Unlock()
188 return first
189}
190
191func (u *user) removeDownstream(dc *downstreamConn) {
192 u.lock.Lock()
193 for i := range u.downstreamConns {
194 if u.downstreamConns[i] == dc {
195 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
196 break
197 }
198 }
199 u.lock.Unlock()
200}
201
202func (u *user) createNetwork(net *Network) (*network, error) {
203 if net.ID != 0 {
204 panic("tried creating an already-existing network")
205 }
206
207 network := newNetwork(u, net)
208 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
209 if err != nil {
210 return nil, err
211 }
212
213 u.forEachDownstream(func(dc *downstreamConn) {
214 if dc.network == nil {
215 dc.runNetwork(network, false)
216 }
217 })
218
219 u.lock.Lock()
220 u.networks = append(u.networks, network)
221 u.lock.Unlock()
222
223 go network.run()
224 return network, nil
225}
Note: See TracBrowser for help on using the repository browser.