source: code/trunk/user.go@ 149

Last change on this file since 149 was 144, checked in by contact, 5 years ago

Consume ring buffer for networks added on-the-fly

File size: 4.6 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[103]10type upstreamIncomingMessage struct {
11 msg *irc.Message
12 uc *upstreamConn
13}
14
15type downstreamIncomingMessage struct {
16 msg *irc.Message
17 dc *downstreamConn
18}
19
[101]20type network struct {
21 Network
22 user *user
[143]23 ring *Ring
[131]24
25 lock sync.Mutex
26 conn *upstreamConn
27 history map[string]uint64
[101]28}
29
30func newNetwork(user *user, record *Network) *network {
31 return &network{
32 Network: *record,
33 user: user,
[143]34 ring: NewRing(user.srv.RingCap),
[131]35 history: make(map[string]uint64),
[101]36 }
37}
38
39func (net *network) run() {
40 var lastTry time.Time
41 for {
42 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
43 delay := retryConnectMinDelay - dur
44 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
45 time.Sleep(delay)
46 }
47 lastTry = time.Now()
48
49 uc, err := connectToUpstream(net)
50 if err != nil {
51 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
52 continue
53 }
54
55 uc.register()
56
[131]57 net.lock.Lock()
[101]58 net.conn = uc
[131]59 net.lock.Unlock()
[101]60
[103]61 if err := uc.readMessages(net.user.upstreamIncoming); err != nil {
[101]62 uc.logger.Printf("failed to handle messages: %v", err)
63 }
64 uc.Close()
65
[131]66 net.lock.Lock()
[101]67 net.conn = nil
[131]68 net.lock.Unlock()
[101]69 }
70}
71
[136]72func (net *network) upstream() *upstreamConn {
73 net.lock.Lock()
74 defer net.lock.Unlock()
75 return net.conn
76}
77
[101]78type user struct {
79 User
80 srv *Server
81
[103]82 upstreamIncoming chan upstreamIncomingMessage
83 downstreamIncoming chan downstreamIncomingMessage
84
[101]85 lock sync.Mutex
86 networks []*network
87 downstreamConns []*downstreamConn
88}
89
90func newUser(srv *Server, record *User) *user {
91 return &user{
[103]92 User: *record,
93 srv: srv,
94 upstreamIncoming: make(chan upstreamIncomingMessage, 64),
95 downstreamIncoming: make(chan downstreamIncomingMessage, 64),
[101]96 }
97}
98
99func (u *user) forEachNetwork(f func(*network)) {
100 u.lock.Lock()
101 for _, network := range u.networks {
102 f(network)
103 }
104 u.lock.Unlock()
105}
106
107func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
108 u.lock.Lock()
109 for _, network := range u.networks {
[136]110 uc := network.upstream()
[101]111 if uc == nil || !uc.registered || uc.closed {
112 continue
113 }
114 f(uc)
115 }
116 u.lock.Unlock()
117}
118
119func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
120 u.lock.Lock()
121 for _, dc := range u.downstreamConns {
122 f(dc)
123 }
124 u.lock.Unlock()
125}
126
127func (u *user) getNetwork(name string) *network {
128 for _, network := range u.networks {
129 if network.Addr == name {
130 return network
131 }
132 }
133 return nil
134}
135
136func (u *user) run() {
137 networks, err := u.srv.db.ListNetworks(u.Username)
138 if err != nil {
139 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
140 return
141 }
142
143 u.lock.Lock()
144 for _, record := range networks {
145 network := newNetwork(u, &record)
146 u.networks = append(u.networks, network)
147
148 go network.run()
149 }
150 u.lock.Unlock()
[103]151
152 for {
153 select {
154 case upstreamMsg := <-u.upstreamIncoming:
155 msg, uc := upstreamMsg.msg, upstreamMsg.uc
[133]156 if uc.closed {
157 uc.logger.Printf("ignoring message on closed connection: %v", msg)
158 break
159 }
[103]160 if err := uc.handleMessage(msg); err != nil {
161 uc.logger.Printf("failed to handle message %q: %v", msg, err)
162 }
163 case downstreamMsg := <-u.downstreamIncoming:
164 msg, dc := downstreamMsg.msg, downstreamMsg.dc
[133]165 if dc.isClosed() {
166 dc.logger.Printf("ignoring message on closed connection: %v", msg)
167 break
168 }
[103]169 err := dc.handleMessage(msg)
170 if ircErr, ok := err.(ircError); ok {
171 ircErr.Message.Prefix = dc.srv.prefix()
172 dc.SendMessage(ircErr.Message)
173 } else if err != nil {
174 dc.logger.Printf("failed to handle message %q: %v", msg, err)
175 dc.Close()
176 }
177 }
178 }
[101]179}
180
[137]181func (u *user) addDownstream(dc *downstreamConn) (first bool) {
182 u.lock.Lock()
183 first = len(dc.user.downstreamConns) == 0
184 u.downstreamConns = append(u.downstreamConns, dc)
185 u.lock.Unlock()
186 return first
187}
188
189func (u *user) removeDownstream(dc *downstreamConn) {
190 u.lock.Lock()
191 for i := range u.downstreamConns {
192 if u.downstreamConns[i] == dc {
193 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
194 break
195 }
196 }
197 u.lock.Unlock()
198}
199
[120]200func (u *user) createNetwork(net *Network) (*network, error) {
[144]201 if net.ID != 0 {
202 panic("tried creating an already-existing network")
203 }
204
[120]205 network := newNetwork(u, net)
[101]206 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
207 if err != nil {
208 return nil, err
209 }
[144]210
211 u.forEachDownstream(func(dc *downstreamConn) {
212 if dc.network == nil {
213 dc.runNetwork(network, false)
214 }
215 })
216
[101]217 u.lock.Lock()
218 u.networks = append(u.networks, network)
219 u.lock.Unlock()
[144]220
[101]221 go network.run()
222 return network, nil
223}
Note: See TracBrowser for help on using the repository browser.