source: code/trunk/user.go@ 165

Last change on this file since 165 was 165, checked in by contact, 5 years ago

Introduce a user.events channel

This allows to easily add new events, and also guarantees ordering
between different event types.

File size: 4.4 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[165]17type eventDownstreamMessage struct {
[103]18 msg *irc.Message
19 dc *downstreamConn
20}
21
[101]22type network struct {
23 Network
24 user *user
[143]25 ring *Ring
[131]26
27 lock sync.Mutex
28 conn *upstreamConn
29 history map[string]uint64
[101]30}
31
32func newNetwork(user *user, record *Network) *network {
33 return &network{
34 Network: *record,
35 user: user,
[143]36 ring: NewRing(user.srv.RingCap),
[131]37 history: make(map[string]uint64),
[101]38 }
39}
40
41func (net *network) run() {
42 var lastTry time.Time
43 for {
44 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
45 delay := retryConnectMinDelay - dur
46 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
47 time.Sleep(delay)
48 }
49 lastTry = time.Now()
50
51 uc, err := connectToUpstream(net)
52 if err != nil {
53 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
54 continue
55 }
56
57 uc.register()
58
[131]59 net.lock.Lock()
[101]60 net.conn = uc
[131]61 net.lock.Unlock()
[101]62
[165]63 if err := uc.readMessages(net.user.events); err != nil {
[101]64 uc.logger.Printf("failed to handle messages: %v", err)
65 }
66 uc.Close()
67
[131]68 net.lock.Lock()
[101]69 net.conn = nil
[131]70 net.lock.Unlock()
[101]71 }
72}
73
[136]74func (net *network) upstream() *upstreamConn {
75 net.lock.Lock()
76 defer net.lock.Unlock()
77 return net.conn
78}
79
[101]80type user struct {
81 User
82 srv *Server
83
[165]84 events chan event
[103]85
[101]86 lock sync.Mutex
87 networks []*network
88 downstreamConns []*downstreamConn
89}
90
91func newUser(srv *Server, record *User) *user {
92 return &user{
[165]93 User: *record,
94 srv: srv,
95 events: make(chan event, 64),
[101]96 }
97}
98
99func (u *user) forEachNetwork(f func(*network)) {
100 u.lock.Lock()
101 for _, network := range u.networks {
102 f(network)
103 }
104 u.lock.Unlock()
105}
106
107func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
108 u.lock.Lock()
109 for _, network := range u.networks {
[136]110 uc := network.upstream()
[101]111 if uc == nil || !uc.registered || uc.closed {
112 continue
113 }
114 f(uc)
115 }
116 u.lock.Unlock()
117}
118
119func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
120 u.lock.Lock()
121 for _, dc := range u.downstreamConns {
122 f(dc)
123 }
124 u.lock.Unlock()
125}
126
127func (u *user) getNetwork(name string) *network {
128 for _, network := range u.networks {
129 if network.Addr == name {
130 return network
131 }
132 }
133 return nil
134}
135
136func (u *user) run() {
137 networks, err := u.srv.db.ListNetworks(u.Username)
138 if err != nil {
139 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
140 return
141 }
142
143 u.lock.Lock()
144 for _, record := range networks {
145 network := newNetwork(u, &record)
146 u.networks = append(u.networks, network)
147
148 go network.run()
149 }
150 u.lock.Unlock()
[103]151
[165]152 for e := range u.events {
153 switch e := e.(type) {
154 case eventUpstreamMessage:
155 msg, uc := e.msg, e.uc
[133]156 if uc.closed {
157 uc.logger.Printf("ignoring message on closed connection: %v", msg)
158 break
159 }
[103]160 if err := uc.handleMessage(msg); err != nil {
161 uc.logger.Printf("failed to handle message %q: %v", msg, err)
162 }
[165]163 case eventDownstreamMessage:
164 msg, dc := e.msg, e.dc
[133]165 if dc.isClosed() {
166 dc.logger.Printf("ignoring message on closed connection: %v", msg)
167 break
168 }
[103]169 err := dc.handleMessage(msg)
170 if ircErr, ok := err.(ircError); ok {
171 ircErr.Message.Prefix = dc.srv.prefix()
172 dc.SendMessage(ircErr.Message)
173 } else if err != nil {
174 dc.logger.Printf("failed to handle message %q: %v", msg, err)
175 dc.Close()
176 }
[165]177 default:
178 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]179 }
180 }
[101]181}
182
[137]183func (u *user) addDownstream(dc *downstreamConn) (first bool) {
184 u.lock.Lock()
185 first = len(dc.user.downstreamConns) == 0
186 u.downstreamConns = append(u.downstreamConns, dc)
187 u.lock.Unlock()
188 return first
189}
190
191func (u *user) removeDownstream(dc *downstreamConn) {
192 u.lock.Lock()
193 for i := range u.downstreamConns {
194 if u.downstreamConns[i] == dc {
195 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
196 break
197 }
198 }
199 u.lock.Unlock()
200}
201
[120]202func (u *user) createNetwork(net *Network) (*network, error) {
[144]203 if net.ID != 0 {
204 panic("tried creating an already-existing network")
205 }
206
[120]207 network := newNetwork(u, net)
[101]208 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
209 if err != nil {
210 return nil, err
211 }
[144]212
213 u.forEachDownstream(func(dc *downstreamConn) {
214 if dc.network == nil {
215 dc.runNetwork(network, false)
216 }
217 })
218
[101]219 u.lock.Lock()
220 u.networks = append(u.networks, network)
221 u.lock.Unlock()
[144]222
[101]223 go network.run()
224 return network, nil
225}
Note: See TracBrowser for help on using the repository browser.