source: code/trunk/user.go@ 167

Last change on this file since 167 was 167, checked in by contact, 5 years ago

Add eventDownstreamDisconnected

This should remove the need for protecting user.downstreamConns with a
mutex.

File size: 4.5 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[165]17type eventDownstreamMessage struct {
[103]18 msg *irc.Message
19 dc *downstreamConn
20}
21
[166]22type eventDownstreamConnected struct {
23 dc *downstreamConn
24}
25
[167]26type eventDownstreamDisconnected struct {
27 dc *downstreamConn
28}
29
[101]30type network struct {
31 Network
32 user *user
[143]33 ring *Ring
[131]34
35 lock sync.Mutex
36 conn *upstreamConn
37 history map[string]uint64
[101]38}
39
40func newNetwork(user *user, record *Network) *network {
41 return &network{
42 Network: *record,
43 user: user,
[143]44 ring: NewRing(user.srv.RingCap),
[131]45 history: make(map[string]uint64),
[101]46 }
47}
48
49func (net *network) run() {
50 var lastTry time.Time
51 for {
52 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
53 delay := retryConnectMinDelay - dur
54 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
55 time.Sleep(delay)
56 }
57 lastTry = time.Now()
58
59 uc, err := connectToUpstream(net)
60 if err != nil {
61 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
62 continue
63 }
64
65 uc.register()
66
[131]67 net.lock.Lock()
[101]68 net.conn = uc
[131]69 net.lock.Unlock()
[101]70
[165]71 if err := uc.readMessages(net.user.events); err != nil {
[101]72 uc.logger.Printf("failed to handle messages: %v", err)
73 }
74 uc.Close()
75
[131]76 net.lock.Lock()
[101]77 net.conn = nil
[131]78 net.lock.Unlock()
[101]79 }
80}
81
[136]82func (net *network) upstream() *upstreamConn {
83 net.lock.Lock()
84 defer net.lock.Unlock()
85 return net.conn
86}
87
[101]88type user struct {
89 User
90 srv *Server
91
[165]92 events chan event
[103]93
[101]94 lock sync.Mutex
95 networks []*network
96 downstreamConns []*downstreamConn
97}
98
99func newUser(srv *Server, record *User) *user {
100 return &user{
[165]101 User: *record,
102 srv: srv,
103 events: make(chan event, 64),
[101]104 }
105}
106
107func (u *user) forEachNetwork(f func(*network)) {
108 u.lock.Lock()
109 for _, network := range u.networks {
110 f(network)
111 }
112 u.lock.Unlock()
113}
114
115func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
116 u.lock.Lock()
117 for _, network := range u.networks {
[136]118 uc := network.upstream()
[101]119 if uc == nil || !uc.registered || uc.closed {
120 continue
121 }
122 f(uc)
123 }
124 u.lock.Unlock()
125}
126
127func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
128 u.lock.Lock()
129 for _, dc := range u.downstreamConns {
130 f(dc)
131 }
132 u.lock.Unlock()
133}
134
135func (u *user) getNetwork(name string) *network {
136 for _, network := range u.networks {
137 if network.Addr == name {
138 return network
139 }
140 }
141 return nil
142}
143
144func (u *user) run() {
145 networks, err := u.srv.db.ListNetworks(u.Username)
146 if err != nil {
147 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
148 return
149 }
150
151 u.lock.Lock()
152 for _, record := range networks {
153 network := newNetwork(u, &record)
154 u.networks = append(u.networks, network)
155
156 go network.run()
157 }
158 u.lock.Unlock()
[103]159
[165]160 for e := range u.events {
161 switch e := e.(type) {
162 case eventUpstreamMessage:
163 msg, uc := e.msg, e.uc
[133]164 if uc.closed {
165 uc.logger.Printf("ignoring message on closed connection: %v", msg)
166 break
167 }
[103]168 if err := uc.handleMessage(msg); err != nil {
169 uc.logger.Printf("failed to handle message %q: %v", msg, err)
170 }
[166]171 case eventDownstreamConnected:
172 dc := e.dc
173 u.lock.Lock()
174 u.downstreamConns = append(u.downstreamConns, dc)
175 u.lock.Unlock()
[167]176 case eventDownstreamDisconnected:
177 dc := e.dc
178 u.lock.Lock()
179 for i := range u.downstreamConns {
180 if u.downstreamConns[i] == dc {
181 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
182 break
183 }
184 }
185 u.lock.Unlock()
[165]186 case eventDownstreamMessage:
187 msg, dc := e.msg, e.dc
[133]188 if dc.isClosed() {
189 dc.logger.Printf("ignoring message on closed connection: %v", msg)
190 break
191 }
[103]192 err := dc.handleMessage(msg)
193 if ircErr, ok := err.(ircError); ok {
194 ircErr.Message.Prefix = dc.srv.prefix()
195 dc.SendMessage(ircErr.Message)
196 } else if err != nil {
197 dc.logger.Printf("failed to handle message %q: %v", msg, err)
198 dc.Close()
199 }
[165]200 default:
201 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]202 }
203 }
[101]204}
205
[120]206func (u *user) createNetwork(net *Network) (*network, error) {
[144]207 if net.ID != 0 {
208 panic("tried creating an already-existing network")
209 }
210
[120]211 network := newNetwork(u, net)
[101]212 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
213 if err != nil {
214 return nil, err
215 }
[144]216
217 u.forEachDownstream(func(dc *downstreamConn) {
218 if dc.network == nil {
219 dc.runNetwork(network, false)
220 }
221 })
222
[101]223 u.lock.Lock()
224 u.networks = append(u.networks, network)
225 u.lock.Unlock()
[144]226
[101]227 go network.run()
228 return network, nil
229}
Note: See TracBrowser for help on using the repository browser.