source: code/trunk/user.go@ 166

Last change on this file since 166 was 166, checked in by contact, 5 years ago

Add eventDownstreamConnected

In a later commit, we'll be able to move part of downstreamConn.register
into the user goroutine to prevent races.

References: https://todo.sr.ht/~emersion/soju/22

File size: 4.4 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[165]17type eventDownstreamMessage struct {
[103]18 msg *irc.Message
19 dc *downstreamConn
20}
21
[166]22type eventDownstreamConnected struct {
23 dc *downstreamConn
24}
25
[101]26type network struct {
27 Network
28 user *user
[143]29 ring *Ring
[131]30
31 lock sync.Mutex
32 conn *upstreamConn
33 history map[string]uint64
[101]34}
35
36func newNetwork(user *user, record *Network) *network {
37 return &network{
38 Network: *record,
39 user: user,
[143]40 ring: NewRing(user.srv.RingCap),
[131]41 history: make(map[string]uint64),
[101]42 }
43}
44
45func (net *network) run() {
46 var lastTry time.Time
47 for {
48 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
49 delay := retryConnectMinDelay - dur
50 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
51 time.Sleep(delay)
52 }
53 lastTry = time.Now()
54
55 uc, err := connectToUpstream(net)
56 if err != nil {
57 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
58 continue
59 }
60
61 uc.register()
62
[131]63 net.lock.Lock()
[101]64 net.conn = uc
[131]65 net.lock.Unlock()
[101]66
[165]67 if err := uc.readMessages(net.user.events); err != nil {
[101]68 uc.logger.Printf("failed to handle messages: %v", err)
69 }
70 uc.Close()
71
[131]72 net.lock.Lock()
[101]73 net.conn = nil
[131]74 net.lock.Unlock()
[101]75 }
76}
77
[136]78func (net *network) upstream() *upstreamConn {
79 net.lock.Lock()
80 defer net.lock.Unlock()
81 return net.conn
82}
83
[101]84type user struct {
85 User
86 srv *Server
87
[165]88 events chan event
[103]89
[101]90 lock sync.Mutex
91 networks []*network
92 downstreamConns []*downstreamConn
93}
94
95func newUser(srv *Server, record *User) *user {
96 return &user{
[165]97 User: *record,
98 srv: srv,
99 events: make(chan event, 64),
[101]100 }
101}
102
103func (u *user) forEachNetwork(f func(*network)) {
104 u.lock.Lock()
105 for _, network := range u.networks {
106 f(network)
107 }
108 u.lock.Unlock()
109}
110
111func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
112 u.lock.Lock()
113 for _, network := range u.networks {
[136]114 uc := network.upstream()
[101]115 if uc == nil || !uc.registered || uc.closed {
116 continue
117 }
118 f(uc)
119 }
120 u.lock.Unlock()
121}
122
123func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
124 u.lock.Lock()
125 for _, dc := range u.downstreamConns {
126 f(dc)
127 }
128 u.lock.Unlock()
129}
130
131func (u *user) getNetwork(name string) *network {
132 for _, network := range u.networks {
133 if network.Addr == name {
134 return network
135 }
136 }
137 return nil
138}
139
140func (u *user) run() {
141 networks, err := u.srv.db.ListNetworks(u.Username)
142 if err != nil {
143 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
144 return
145 }
146
147 u.lock.Lock()
148 for _, record := range networks {
149 network := newNetwork(u, &record)
150 u.networks = append(u.networks, network)
151
152 go network.run()
153 }
154 u.lock.Unlock()
[103]155
[165]156 for e := range u.events {
157 switch e := e.(type) {
158 case eventUpstreamMessage:
159 msg, uc := e.msg, e.uc
[133]160 if uc.closed {
161 uc.logger.Printf("ignoring message on closed connection: %v", msg)
162 break
163 }
[103]164 if err := uc.handleMessage(msg); err != nil {
165 uc.logger.Printf("failed to handle message %q: %v", msg, err)
166 }
[166]167 case eventDownstreamConnected:
168 dc := e.dc
169 u.lock.Lock()
170 u.downstreamConns = append(u.downstreamConns, dc)
171 u.lock.Unlock()
[165]172 case eventDownstreamMessage:
173 msg, dc := e.msg, e.dc
[133]174 if dc.isClosed() {
175 dc.logger.Printf("ignoring message on closed connection: %v", msg)
176 break
177 }
[103]178 err := dc.handleMessage(msg)
179 if ircErr, ok := err.(ircError); ok {
180 ircErr.Message.Prefix = dc.srv.prefix()
181 dc.SendMessage(ircErr.Message)
182 } else if err != nil {
183 dc.logger.Printf("failed to handle message %q: %v", msg, err)
184 dc.Close()
185 }
[165]186 default:
187 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]188 }
189 }
[101]190}
191
[137]192func (u *user) removeDownstream(dc *downstreamConn) {
193 u.lock.Lock()
194 for i := range u.downstreamConns {
195 if u.downstreamConns[i] == dc {
196 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
197 break
198 }
199 }
200 u.lock.Unlock()
201}
202
[120]203func (u *user) createNetwork(net *Network) (*network, error) {
[144]204 if net.ID != 0 {
205 panic("tried creating an already-existing network")
206 }
207
[120]208 network := newNetwork(u, net)
[101]209 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
210 if err != nil {
211 return nil, err
212 }
[144]213
214 u.forEachDownstream(func(dc *downstreamConn) {
215 if dc.network == nil {
216 dc.runNetwork(network, false)
217 }
218 })
219
[101]220 u.lock.Lock()
221 u.networks = append(u.networks, network)
222 u.lock.Unlock()
[144]223
[101]224 go network.run()
225 return network, nil
226}
Note: See TracBrowser for help on using the repository browser.