Changeset 177 in code for trunk/upstream.go
- Timestamp:
- Mar 27, 2020, 11:07:20 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/upstream.go
r176 r177 60 60 saslClient sasl.Client 61 61 saslStarted bool 62 63 // set of LIST commands in progress, per downstream 64 // access is synchronized with user.pendingLISTsLock 65 pendingLISTDownstreamSet map[uint64]struct{} 62 66 } 63 67 … … 80 84 outgoing := make(chan *irc.Message, 64) 81 85 uc := &upstreamConn{ 82 network: network, 83 logger: logger, 84 net: netConn, 85 irc: irc.NewConn(netConn), 86 srv: network.user.srv, 87 user: network.user, 88 outgoing: outgoing, 89 channels: make(map[string]*upstreamChannel), 90 caps: make(map[string]string), 91 batches: make(map[string]batch), 92 availableChannelTypes: stdChannelTypes, 93 availableChannelModes: stdChannelModes, 94 availableMemberships: stdMemberships, 86 network: network, 87 logger: logger, 88 net: netConn, 89 irc: irc.NewConn(netConn), 90 srv: network.user.srv, 91 user: network.user, 92 outgoing: outgoing, 93 channels: make(map[string]*upstreamChannel), 94 caps: make(map[string]string), 95 batches: make(map[string]batch), 96 availableChannelTypes: stdChannelTypes, 97 availableChannelModes: stdChannelModes, 98 availableMemberships: stdMemberships, 99 pendingLISTDownstreamSet: make(map[uint64]struct{}), 95 100 } 96 101 … … 137 142 } 138 143 close(uc.closed) 144 145 uc.endPendingLists(true) 139 146 return nil 140 147 } … … 171 178 } 172 179 return false 180 } 181 182 func (uc *upstreamConn) getPendingList() *pendingLIST { 183 uc.user.pendingLISTsLock.Lock() 184 defer uc.user.pendingLISTsLock.Unlock() 185 for _, pl := range uc.user.pendingLISTs { 186 if _, ok := pl.pendingCommands[uc.network.ID]; !ok { 187 continue 188 } 189 return &pl 190 } 191 return nil 192 } 193 194 func (uc *upstreamConn) endPendingLists(all bool) (found bool) { 195 found = false 196 uc.user.pendingLISTsLock.Lock() 197 defer uc.user.pendingLISTsLock.Unlock() 198 for i := 0; i < len(uc.user.pendingLISTs); i++ { 199 pl := uc.user.pendingLISTs[i] 200 if _, ok := pl.pendingCommands[uc.network.ID]; !ok { 201 continue 202 } 203 delete(pl.pendingCommands, uc.network.ID) 204 if len(pl.pendingCommands) == 0 { 205 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...) 206 i-- 207 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) { 208 dc.SendMessage(&irc.Message{ 209 Prefix: dc.srv.prefix(), 210 Command: irc.RPL_LISTEND, 211 Params: []string{dc.nick, "End of /LIST"}, 212 }) 213 }) 214 } 215 found = true 216 if !all { 217 delete(uc.pendingLISTDownstreamSet, pl.downstreamID) 218 uc.user.forEachUpstream(func(uc *upstreamConn) { 219 uc.trySendList(pl.downstreamID) 220 }) 221 return 222 } 223 } 224 return 225 } 226 227 func (uc *upstreamConn) trySendList(downstreamID uint64) { 228 // must be called with a lock in uc.user.pendingLISTsLock 229 230 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok { 231 // a LIST command is already pending 232 // we will try again when that command is completed 233 return 234 } 235 236 for _, pl := range uc.user.pendingLISTs { 237 if pl.downstreamID != downstreamID { 238 continue 239 } 240 // this is the first pending LIST command list of the downstream 241 listCommand, ok := pl.pendingCommands[uc.network.ID] 242 if !ok { 243 // there is no command for this upstream in these LIST commands 244 // do not send anything 245 continue 246 } 247 // there is a command for this upstream in these LIST commands 248 // send it now 249 250 uc.SendMessageLabeled(downstreamID, listCommand) 251 252 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{} 253 return 254 } 173 255 } 174 256 … … 834 916 } 835 917 ch.TopicTime = time.Unix(sec, 0) 918 case irc.RPL_LIST: 919 var channel, clients, topic string 920 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil { 921 return err 922 } 923 924 pl := uc.getPendingList() 925 if pl == nil { 926 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST") 927 } 928 929 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) { 930 dc.SendMessage(&irc.Message{ 931 Prefix: dc.srv.prefix(), 932 Command: irc.RPL_LIST, 933 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic}, 934 }) 935 }) 936 case irc.RPL_LISTEND: 937 ok := uc.endPendingLists(false) 938 if !ok { 939 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST") 940 } 836 941 case irc.RPL_NAMREPLY: 837 942 var name, statusStr, members string … … 1091 1196 }) 1092 1197 }) 1198 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN: 1199 var command, reason string 1200 if err := parseMessageParams(msg, nil, &command, &reason); err != nil { 1201 return err 1202 } 1203 1204 if command == "LIST" { 1205 ok := uc.endPendingLists(false) 1206 if !ok { 1207 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command) 1208 } 1209 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) { 1210 dc.SendMessage(&irc.Message{ 1211 Prefix: uc.srv.prefix(), 1212 Command: msg.Command, 1213 Params: []string{dc.nick, "LIST", reason}, 1214 }) 1215 }) 1216 } 1093 1217 case "TAGMSG": 1094 1218 // TODO: relay to downstream connections that accept message-tags … … 1101 1225 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD: 1102 1226 // Ignore 1227 case irc.RPL_LISTSTART: 1228 // Ignore 1103 1229 case rpl_localusers, rpl_globalusers: 1104 1230 // Ignore
Note:
See TracChangeset
for help on using the changeset viewer.