- Timestamp:
- Mar 27, 2020, 11:07:20 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r176 r177 1078 1078 sendTopic(dc, ch) 1079 1079 } 1080 case "LIST": 1081 // TODO: support ELIST when supported by all upstreams 1082 1083 dc.user.pendingLISTsLock.Lock() 1084 defer dc.user.pendingLISTsLock.Unlock() 1085 1086 pl := pendingLIST{ 1087 downstreamID: dc.id, 1088 pendingCommands: make(map[int64]*irc.Message), 1089 } 1090 var upstreamChannels map[int64][]string 1091 if len(msg.Params) > 0 { 1092 upstreamChannels = make(map[int64][]string) 1093 channels := strings.Split(msg.Params[0], ",") 1094 for _, channel := range channels { 1095 uc, upstreamChannel, err := dc.unmarshalEntity(channel) 1096 if err != nil { 1097 return err 1098 } 1099 upstreamChannels[uc.network.ID] = append(upstreamChannels[uc.network.ID], upstreamChannel) 1100 } 1101 } 1102 1103 dc.user.pendingLISTs = append(dc.user.pendingLISTs, pl) 1104 dc.forEachUpstream(func(uc *upstreamConn) { 1105 var params []string 1106 if upstreamChannels != nil { 1107 if channels, ok := upstreamChannels[uc.network.ID]; ok { 1108 params = []string{strings.Join(channels, ",")} 1109 } else { 1110 return 1111 } 1112 } 1113 pl.pendingCommands[uc.network.ID] = &irc.Message{ 1114 Command: "LIST", 1115 Params: params, 1116 } 1117 uc.trySendList(dc.id) 1118 }) 1080 1119 case "NAMES": 1081 1120 if len(msg.Params) == 0 { -
trunk/upstream.go
r176 r177 60 60 saslClient sasl.Client 61 61 saslStarted bool 62 63 // set of LIST commands in progress, per downstream 64 // access is synchronized with user.pendingLISTsLock 65 pendingLISTDownstreamSet map[uint64]struct{} 62 66 } 63 67 … … 80 84 outgoing := make(chan *irc.Message, 64) 81 85 uc := &upstreamConn{ 82 network: network, 83 logger: logger, 84 net: netConn, 85 irc: irc.NewConn(netConn), 86 srv: network.user.srv, 87 user: network.user, 88 outgoing: outgoing, 89 channels: make(map[string]*upstreamChannel), 90 caps: make(map[string]string), 91 batches: make(map[string]batch), 92 availableChannelTypes: stdChannelTypes, 93 availableChannelModes: stdChannelModes, 94 availableMemberships: stdMemberships, 86 network: network, 87 logger: logger, 88 net: netConn, 89 irc: irc.NewConn(netConn), 90 srv: network.user.srv, 91 user: network.user, 92 outgoing: outgoing, 93 channels: make(map[string]*upstreamChannel), 94 caps: make(map[string]string), 95 batches: make(map[string]batch), 96 availableChannelTypes: stdChannelTypes, 97 availableChannelModes: stdChannelModes, 98 availableMemberships: stdMemberships, 99 pendingLISTDownstreamSet: make(map[uint64]struct{}), 95 100 } 96 101 … … 137 142 } 138 143 close(uc.closed) 144 145 uc.endPendingLists(true) 139 146 return nil 140 147 } … … 171 178 } 172 179 return false 180 } 181 182 func (uc *upstreamConn) getPendingList() *pendingLIST { 183 uc.user.pendingLISTsLock.Lock() 184 defer uc.user.pendingLISTsLock.Unlock() 185 for _, pl := range uc.user.pendingLISTs { 186 if _, ok := pl.pendingCommands[uc.network.ID]; !ok { 187 continue 188 } 189 return &pl 190 } 191 return nil 192 } 193 194 func (uc *upstreamConn) endPendingLists(all bool) (found bool) { 195 found = false 196 uc.user.pendingLISTsLock.Lock() 197 defer uc.user.pendingLISTsLock.Unlock() 198 for i := 0; i < len(uc.user.pendingLISTs); i++ { 199 pl := uc.user.pendingLISTs[i] 200 if _, ok := pl.pendingCommands[uc.network.ID]; !ok { 201 continue 202 } 203 delete(pl.pendingCommands, uc.network.ID) 204 if len(pl.pendingCommands) == 0 { 205 uc.user.pendingLISTs = append(uc.user.pendingLISTs[:i], uc.user.pendingLISTs[i+1:]...) 206 i-- 207 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) { 208 dc.SendMessage(&irc.Message{ 209 Prefix: dc.srv.prefix(), 210 Command: irc.RPL_LISTEND, 211 Params: []string{dc.nick, "End of /LIST"}, 212 }) 213 }) 214 } 215 found = true 216 if !all { 217 delete(uc.pendingLISTDownstreamSet, pl.downstreamID) 218 uc.user.forEachUpstream(func(uc *upstreamConn) { 219 uc.trySendList(pl.downstreamID) 220 }) 221 return 222 } 223 } 224 return 225 } 226 227 func (uc *upstreamConn) trySendList(downstreamID uint64) { 228 // must be called with a lock in uc.user.pendingLISTsLock 229 230 if _, ok := uc.pendingLISTDownstreamSet[downstreamID]; ok { 231 // a LIST command is already pending 232 // we will try again when that command is completed 233 return 234 } 235 236 for _, pl := range uc.user.pendingLISTs { 237 if pl.downstreamID != downstreamID { 238 continue 239 } 240 // this is the first pending LIST command list of the downstream 241 listCommand, ok := pl.pendingCommands[uc.network.ID] 242 if !ok { 243 // there is no command for this upstream in these LIST commands 244 // do not send anything 245 continue 246 } 247 // there is a command for this upstream in these LIST commands 248 // send it now 249 250 uc.SendMessageLabeled(downstreamID, listCommand) 251 252 uc.pendingLISTDownstreamSet[downstreamID] = struct{}{} 253 return 254 } 173 255 } 174 256 … … 834 916 } 835 917 ch.TopicTime = time.Unix(sec, 0) 918 case irc.RPL_LIST: 919 var channel, clients, topic string 920 if err := parseMessageParams(msg, nil, &channel, &clients, &topic); err != nil { 921 return err 922 } 923 924 pl := uc.getPendingList() 925 if pl == nil { 926 return fmt.Errorf("unexpected RPL_LIST: no matching pending LIST") 927 } 928 929 uc.forEachDownstreamByID(pl.downstreamID, func(dc *downstreamConn) { 930 dc.SendMessage(&irc.Message{ 931 Prefix: dc.srv.prefix(), 932 Command: irc.RPL_LIST, 933 Params: []string{dc.nick, dc.marshalChannel(uc, channel), clients, topic}, 934 }) 935 }) 936 case irc.RPL_LISTEND: 937 ok := uc.endPendingLists(false) 938 if !ok { 939 return fmt.Errorf("unexpected RPL_LISTEND: no matching pending LIST") 940 } 836 941 case irc.RPL_NAMREPLY: 837 942 var name, statusStr, members string … … 1091 1196 }) 1092 1197 }) 1198 case irc.ERR_UNKNOWNCOMMAND, irc.RPL_TRYAGAIN: 1199 var command, reason string 1200 if err := parseMessageParams(msg, nil, &command, &reason); err != nil { 1201 return err 1202 } 1203 1204 if command == "LIST" { 1205 ok := uc.endPendingLists(false) 1206 if !ok { 1207 return fmt.Errorf("unexpected response for LIST: %q: no matching pending LIST", msg.Command) 1208 } 1209 uc.forEachDownstreamByID(downstreamID, func(dc *downstreamConn) { 1210 dc.SendMessage(&irc.Message{ 1211 Prefix: uc.srv.prefix(), 1212 Command: msg.Command, 1213 Params: []string{dc.nick, "LIST", reason}, 1214 }) 1215 }) 1216 } 1093 1217 case "TAGMSG": 1094 1218 // TODO: relay to downstream connections that accept message-tags … … 1101 1225 case irc.RPL_MOTDSTART, irc.RPL_MOTD, irc.RPL_ENDOFMOTD: 1102 1226 // Ignore 1227 case irc.RPL_LISTSTART: 1228 // Ignore 1103 1229 case rpl_localusers, rpl_globalusers: 1104 1230 // Ignore -
trunk/user.go
r175 r177 97 97 networks []*network 98 98 downstreamConns []*downstreamConn 99 100 // LIST commands in progress 101 pendingLISTsLock sync.Mutex 102 pendingLISTs []pendingLIST 103 } 104 105 type pendingLIST struct { 106 downstreamID uint64 107 // list of per-upstream LIST commands not yet sent or completed 108 pendingCommands map[int64]*irc.Message 99 109 } 100 110
Note:
See TracChangeset
for help on using the changeset viewer.