Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r166 r167 309 309 if dc.isClosed() { 310 310 return fmt.Errorf("downstream connection already closed") 311 }312 313 if u := dc.user; u != nil {314 u.removeDownstream(dc)315 311 } 316 312 … … 758 754 seq := consumer.Close() 759 755 760 // TODO: need to take dc.network into account here 761 dc.user.lock.Lock() 762 lastDownstream := len(dc.user.downstreamConns) == 0 763 dc.user.lock.Unlock() 764 765 if lastDownstream { 766 net.lock.Lock() 767 net.history[historyName] = seq 768 net.lock.Unlock() 769 } 756 net.lock.Lock() 757 net.history[historyName] = seq 758 net.lock.Unlock() 770 759 }() 771 760 } -
trunk/server.go
r166 r167 124 124 dc.logger.Print(err) 125 125 } 126 dc.user.events <- eventDownstreamDisconnected{dc} 126 127 } 127 128 dc.Close() -
trunk/user.go
r166 r167 21 21 22 22 type eventDownstreamConnected struct { 23 dc *downstreamConn 24 } 25 26 type eventDownstreamDisconnected struct { 23 27 dc *downstreamConn 24 28 } … … 170 174 u.downstreamConns = append(u.downstreamConns, dc) 171 175 u.lock.Unlock() 176 case eventDownstreamDisconnected: 177 dc := e.dc 178 u.lock.Lock() 179 for i := range u.downstreamConns { 180 if u.downstreamConns[i] == dc { 181 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...) 182 break 183 } 184 } 185 u.lock.Unlock() 172 186 case eventDownstreamMessage: 173 187 msg, dc := e.msg, e.dc … … 190 204 } 191 205 192 func (u *user) removeDownstream(dc *downstreamConn) {193 u.lock.Lock()194 for i := range u.downstreamConns {195 if u.downstreamConns[i] == dc {196 u.downstreamConns = append(u.downstreamConns[:i], u.downstreamConns[i+1:]...)197 break198 }199 }200 u.lock.Unlock()201 }202 203 206 func (u *user) createNetwork(net *Network) (*network, error) { 204 207 if net.ID != 0 {
Note:
See TracChangeset
for help on using the changeset viewer.