source: code/trunk/user.go@ 177

Last change on this file since 177 was 177, checked in by delthas, 5 years ago

Add LIST support

This commit adds support for downstream LIST messages from multiple
concurrent downstreams to multiple concurrent upstreams, including
support for multiple pending LIST requests from the same downstream.

Because a unique RPL_LISTEND message must be sent to the requesting
downstream, and that there might be multiple upstreams, each sending
their own RPL_LISTEND, a cache of RPL_LISTEND replies of some sort is
required to match RPL_LISTEND together in order to only send one back
downstream.

This commit adds a list of "pending LIST" structs, which each contain a
map of all upstreams that yet need to send a RPL_LISTEND, and the
corresponding LIST request associated with that response. This list of
pending LISTs is sorted according to the order that the requesting
downstreams sent the LIST messages in. Each pending set also stores the
id of the requesting downstream, in order to only forward the replies to
it and no other downstream. (This is important because LIST replies can
typically amount to several thousands messages on large servers.)

When a single downstream makes multiple LIST requests, only the first
one will be immediately sent to the upstream servers. The next ones will
be buffered until the first one is completed. Distinct downstreams can
make concurrent LIST requests without any request buffering.

Each RPL_LIST message is forwarded to the downstream of the first
matching pending LIST struct.

When an upstream sends an RPL_LISTEND message, the upstream is removed
from the first matching pending LIST struct, but that message is not
immediately forwarded downstream. If there are no remaining pending LIST
requests in that struct is then empty, that means all upstreams have
sent back all their RPL_LISTEND replies (which means they also sent all
their RPL_LIST replies); so a unique RPL_LISTEND is sent to downstream
and that pending LIST set is removed from the cache.

Upstreams are removed from the pending LIST structs in two other cases:

  • when they are closed (to avoid stalling because of a disconnected

upstream that will never reply to the LIST message): they are removed
from all pending LIST structs

  • when they reply with an ERR_UNKNOWNCOMMAND or RPL_TRYAGAIN LIST reply,

which is typically used when a user is not allowed to LIST because they
just joined the server: they are removed from the first pending LIST
struct, as if an RPL_LISTEND message was received

File size: 4.8 KB
RevLine 
[101]1package soju
2
3import (
4 "sync"
5 "time"
[103]6
7 "gopkg.in/irc.v3"
[101]8)
9
[165]10type event interface{}
11
12type eventUpstreamMessage struct {
[103]13 msg *irc.Message
14 uc *upstreamConn
15}
16
[165]17type eventDownstreamMessage struct {
[103]18 msg *irc.Message
19 dc *downstreamConn
20}
21
[166]22type eventDownstreamConnected struct {
23 dc *downstreamConn
24}
25
[167]26type eventDownstreamDisconnected struct {
27 dc *downstreamConn
28}
29
[101]30type network struct {
31 Network
32 user *user
[143]33 ring *Ring
[131]34
35 lock sync.Mutex
36 conn *upstreamConn
37 history map[string]uint64
[101]38}
39
40func newNetwork(user *user, record *Network) *network {
41 return &network{
42 Network: *record,
43 user: user,
[143]44 ring: NewRing(user.srv.RingCap),
[131]45 history: make(map[string]uint64),
[101]46 }
47}
48
49func (net *network) run() {
50 var lastTry time.Time
51 for {
52 if dur := time.Now().Sub(lastTry); dur < retryConnectMinDelay {
53 delay := retryConnectMinDelay - dur
54 net.user.srv.Logger.Printf("waiting %v before trying to reconnect to %q", delay.Truncate(time.Second), net.Addr)
55 time.Sleep(delay)
56 }
57 lastTry = time.Now()
58
59 uc, err := connectToUpstream(net)
60 if err != nil {
61 net.user.srv.Logger.Printf("failed to connect to upstream server %q: %v", net.Addr, err)
62 continue
63 }
64
65 uc.register()
66
[175]67 // TODO: wait for the connection to be registered before adding it to
68 // net, otherwise messages might be sent to it while still being
69 // unauthenticated
[131]70 net.lock.Lock()
[101]71 net.conn = uc
[131]72 net.lock.Unlock()
[101]73
[165]74 if err := uc.readMessages(net.user.events); err != nil {
[101]75 uc.logger.Printf("failed to handle messages: %v", err)
76 }
77 uc.Close()
78
[131]79 net.lock.Lock()
[101]80 net.conn = nil
[131]81 net.lock.Unlock()
[101]82 }
83}
84
[136]85func (net *network) upstream() *upstreamConn {
86 net.lock.Lock()
87 defer net.lock.Unlock()
88 return net.conn
89}
90
[101]91type user struct {
92 User
93 srv *Server
94
[165]95 events chan event
[103]96
[101]97 networks []*network
98 downstreamConns []*downstreamConn
[177]99
100 // LIST commands in progress
101 pendingLISTsLock sync.Mutex
102 pendingLISTs []pendingLIST
[101]103}
104
[177]105type pendingLIST struct {
106 downstreamID uint64
107 // list of per-upstream LIST commands not yet sent or completed
108 pendingCommands map[int64]*irc.Message
109}
110
[101]111func newUser(srv *Server, record *User) *user {
112 return &user{
[165]113 User: *record,
114 srv: srv,
115 events: make(chan event, 64),
[101]116 }
117}
118
119func (u *user) forEachNetwork(f func(*network)) {
120 for _, network := range u.networks {
121 f(network)
122 }
123}
124
125func (u *user) forEachUpstream(f func(uc *upstreamConn)) {
126 for _, network := range u.networks {
[136]127 uc := network.upstream()
[175]128 if uc == nil || !uc.registered {
[101]129 continue
130 }
131 f(uc)
132 }
133}
134
135func (u *user) forEachDownstream(f func(dc *downstreamConn)) {
136 for _, dc := range u.downstreamConns {
137 f(dc)
138 }
139}
140
141func (u *user) getNetwork(name string) *network {
142 for _, network := range u.networks {
143 if network.Addr == name {
144 return network
145 }
146 }
147 return nil
148}
149
150func (u *user) run() {
151 networks, err := u.srv.db.ListNetworks(u.Username)
152 if err != nil {
153 u.srv.Logger.Printf("failed to list networks for user %q: %v", u.Username, err)
154 return
155 }
156
157 for _, record := range networks {
158 network := newNetwork(u, &record)
159 u.networks = append(u.networks, network)
160
161 go network.run()
162 }
[103]163
[165]164 for e := range u.events {
165 switch e := e.(type) {
166 case eventUpstreamMessage:
167 msg, uc := e.msg, e.uc
[175]168 if uc.isClosed() {
[133]169 uc.logger.Printf("ignoring message on closed connection: %v", msg)
170 break
171 }
[103]172 if err := uc.handleMessage(msg); err != nil {
173 uc.logger.Printf("failed to handle message %q: %v", msg, err)
174 }
[166]175 case eventDownstreamConnected:
176 dc := e.dc
[168]177
178 if err := dc.welcome(); err != nil {
179 dc.logger.Printf("failed to handle new registered connection: %v", err)
180 break
181 }
182
[166]183 u.downstreamConns = append(u.downstreamConns, dc)
[167]184 case eventDownstreamDisconnected:
185 dc := e.dc
186 for i := range u.downstreamConns {
187 if u.downstreamConns[i] == dc {
188 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)
189 break
190 }
191 }
[165]192 case eventDownstreamMessage:
193 msg, dc := e.msg, e.dc
[133]194 if dc.isClosed() {
195 dc.logger.Printf("ignoring message on closed connection: %v", msg)
196 break
197 }
[103]198 err := dc.handleMessage(msg)
199 if ircErr, ok := err.(ircError); ok {
200 ircErr.Message.Prefix = dc.srv.prefix()
201 dc.SendMessage(ircErr.Message)
202 } else if err != nil {
203 dc.logger.Printf("failed to handle message %q: %v", msg, err)
204 dc.Close()
205 }
[165]206 default:
207 u.srv.Logger.Printf("received unknown event type: %T", e)
[103]208 }
209 }
[101]210}
211
[120]212func (u *user) createNetwork(net *Network) (*network, error) {
[144]213 if net.ID != 0 {
214 panic("tried creating an already-existing network")
215 }
216
[120]217 network := newNetwork(u, net)
[101]218 err := u.srv.db.StoreNetwork(u.Username, &network.Network)
219 if err != nil {
220 return nil, err
221 }
[144]222
223 u.forEachDownstream(func(dc *downstreamConn) {
224 if dc.network == nil {
225 dc.runNetwork(network, false)
226 }
227 })
228
[101]229 u.networks = append(u.networks, network)
[144]230
[101]231 go network.run()
232 return network, nil
233}
Note: See TracBrowser for help on using the repository browser.