Changeset 188 in code
- Timestamp:
- Mar 31, 2020, 4:16:54 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r185 r188 52 52 }} 53 53 54 type ringMessage struct {55 consumer *RingConsumer56 upstreamConn *upstreamConn57 }58 59 54 type downstreamConn struct { 60 id uint64 61 net net.Conn 62 irc *irc.Conn 63 srv *Server 64 logger Logger 65 outgoing chan *irc.Message 66 ringMessages chan ringMessage 67 closed chan struct{} 55 id uint64 56 net net.Conn 57 irc *irc.Conn 58 srv *Server 59 logger Logger 60 outgoing chan *irc.Message 61 closed chan struct{} 68 62 69 63 registered bool … … 90 84 func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { 91 85 dc := &downstreamConn{ 92 id: id, 93 net: netConn, 94 irc: irc.NewConn(netConn), 95 srv: srv, 96 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 97 outgoing: make(chan *irc.Message, 64), 98 ringMessages: make(chan ringMessage), 99 closed: make(chan struct{}), 100 caps: make(map[string]bool), 101 ourMessages: make(map[*irc.Message]struct{}), 86 id: id, 87 net: netConn, 88 irc: irc.NewConn(netConn), 89 srv: srv, 90 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 91 outgoing: make(chan *irc.Message, 64), 92 closed: make(chan struct{}), 93 caps: make(map[string]bool), 94 ourMessages: make(map[*irc.Message]struct{}), 102 95 } 103 96 dc.hostname = netConn.RemoteAddr().String() … … 258 251 } 259 252 err = dc.irc.WriteMessage(msg) 260 case ringMessage := <-dc.ringMessages:261 consumer, uc := ringMessage.consumer, ringMessage.upstreamConn262 for {263 msg := consumer.Peek()264 if msg == nil {265 break266 }267 268 dc.lock.Lock()269 _, ours := dc.ourMessages[msg]270 delete(dc.ourMessages, msg)271 dc.lock.Unlock()272 if ours {273 // The message comes from our connection, don't echo it274 // back275 consumer.Consume()276 continue277 }278 279 msg = msg.Copy()280 switch msg.Command {281 case "PRIVMSG":282 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)283 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])284 default:285 panic("expected to consume a PRIVMSG message")286 }287 if dc.srv.Debug {288 dc.logger.Printf("sent: %v", msg)289 }290 err = dc.irc.WriteMessage(msg)291 if err != nil {292 break293 }294 consumer.Consume()295 }296 253 case <-dc.closed: 297 254 closed = true … … 775 732 break 776 733 } 777 dc.ringMessages <- ringMessage{consumer, uc} 734 735 for { 736 msg := consumer.Peek() 737 if msg == nil { 738 break 739 } 740 741 dc.lock.Lock() 742 _, ours := dc.ourMessages[msg] 743 delete(dc.ourMessages, msg) 744 dc.lock.Unlock() 745 if ours { 746 // The message comes from our connection, don't echo it 747 // back 748 consumer.Consume() 749 continue 750 } 751 752 msg = msg.Copy() 753 switch msg.Command { 754 case "PRIVMSG": 755 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) 756 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) 757 default: 758 panic("expected to consume a PRIVMSG message") 759 } 760 761 dc.SendMessage(msg) 762 consumer.Consume() 763 } 778 764 case <-dc.closed: 779 765 closed = true
Note:
See TracChangeset
for help on using the changeset viewer.