Changeset 57 in code for trunk/downstream.go
- Timestamp:
- Feb 17, 2020, 2:46:29 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r56 r57 41 41 42 42 type downstreamConn struct { 43 net net.Conn 44 irc *irc.Conn 45 srv *Server 46 logger Logger 47 messages chan *irc.Message 43 net net.Conn 44 irc *irc.Conn 45 srv *Server 46 logger Logger 47 messages chan *irc.Message 48 consumers chan *RingConsumer 49 closed chan struct{} 48 50 49 51 registered bool 50 52 user *user 51 closed bool52 53 nick string 53 54 username string … … 57 58 func newDownstreamConn(srv *Server, netConn net.Conn) *downstreamConn { 58 59 dc := &downstreamConn{ 59 net: netConn, 60 irc: irc.NewConn(netConn), 61 srv: srv, 62 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 63 messages: make(chan *irc.Message, 64), 60 net: netConn, 61 irc: irc.NewConn(netConn), 62 srv: srv, 63 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 64 messages: make(chan *irc.Message, 64), 65 consumers: make(chan *RingConsumer), 66 closed: make(chan struct{}), 64 67 } 65 68 … … 83 86 User: dc.username, 84 87 // TODO: fill the host? 88 } 89 } 90 91 func (dc *downstreamConn) isClosed() bool { 92 select { 93 case <-dc.closed: 94 return true 95 default: 96 return false 85 97 } 86 98 } … … 105 117 } 106 118 107 if dc. closed{119 if dc.isClosed() { 108 120 return nil 109 121 } … … 114 126 115 127 func (dc *downstreamConn) writeMessages() error { 116 for msg := range dc.messages { 117 if err := dc.irc.WriteMessage(msg); err != nil { 118 return err 128 for { 129 var err error 130 var closed bool 131 select { 132 case msg := <-dc.messages: 133 err = dc.irc.WriteMessage(msg) 134 case consumer := <-dc.consumers: 135 for { 136 msg := consumer.Peek() 137 if msg == nil { 138 break 139 } 140 err = dc.irc.WriteMessage(msg) 141 if err != nil { 142 break 143 } 144 consumer.Consume() 145 } 146 case <-dc.closed: 147 closed = true 148 } 149 if err != nil { 150 return err 151 } 152 if closed { 153 break 119 154 } 120 155 } … … 123 158 124 159 func (dc *downstreamConn) Close() error { 125 if dc. closed{160 if dc.isClosed() { 126 161 return fmt.Errorf("downstream connection already closed") 127 162 } … … 135 170 } 136 171 u.lock.Unlock() 137 138 // TODO: figure out a better way to advance the ring buffer consumer cursor 139 u.forEachUpstream(func(uc *upstreamConn) { 140 // TODO: let clients specify the ring buffer name in their username 141 uc.ring.Consumer("").Reset() 142 }) 143 } 144 145 close(dc.messages) 146 dc.closed = true 147 172 } 173 174 close(dc.closed) 148 175 return nil 149 176 } … … 212 239 213 240 u.lock.Lock() 241 firstDownstream := len(u.downstreamConns) == 0 214 242 u.downstreamConns = append(u.downstreamConns, dc) 215 243 u.lock.Unlock() … … 250 278 251 279 // TODO: let clients specify the ring buffer name in their username 252 consumer := uc.ring.Consumer("") 253 for { 254 // TODO: these messages will get lost if the connection is closed 255 msg := consumer.Consume() 256 if msg == nil { 257 break 258 } 259 dc.SendMessage(msg) 260 } 280 historyName := "" 281 282 var seqPtr *uint64 283 if firstDownstream { 284 seq, ok := uc.history[historyName] 285 if ok { 286 seqPtr = &seq 287 } 288 } 289 290 consumer, ch := uc.ring.Consumer(seqPtr) 291 go func() { 292 for { 293 var closed bool 294 select { 295 case <-ch: 296 dc.consumers <- consumer 297 case <-dc.closed: 298 closed = true 299 } 300 if closed { 301 break 302 } 303 } 304 305 seq := consumer.Close() 306 307 dc.user.lock.Lock() 308 lastDownstream := len(dc.user.downstreamConns) == 0 309 dc.user.lock.Unlock() 310 311 if lastDownstream { 312 uc.history[historyName] = seq 313 } 314 }() 261 315 }) 262 316
Note:
See TracChangeset
for help on using the changeset viewer.