- Timestamp:
- Apr 1, 2020, 2:02:31 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r203 r204 72 72 network *network // can be nil 73 73 74 ringConsumers map[*network]*RingConsumer 75 74 76 negociatingCaps bool 75 77 capVersion int … … 84 86 func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn { 85 87 dc := &downstreamConn{ 86 id: id, 87 net: netConn, 88 irc: irc.NewConn(netConn), 89 srv: srv, 90 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 91 outgoing: make(chan *irc.Message, 64), 92 closed: make(chan struct{}), 93 caps: make(map[string]bool), 94 ourMessages: make(map[*irc.Message]struct{}), 88 id: id, 89 net: netConn, 90 irc: irc.NewConn(netConn), 91 srv: srv, 92 logger: &prefixLogger{srv.Logger, fmt.Sprintf("downstream %q: ", netConn.RemoteAddr())}, 93 outgoing: make(chan *irc.Message, 64), 94 closed: make(chan struct{}), 95 ringConsumers: make(map[*network]*RingConsumer), 96 caps: make(map[string]bool), 97 ourMessages: make(map[*irc.Message]struct{}), 95 98 } 96 99 dc.hostname = netConn.RemoteAddr().String() … … 723 726 var seqPtr *uint64 724 727 if loadHistory { 725 net.lock.Lock()726 728 seq, ok := net.history[dc.clientName] 727 net.lock.Unlock()728 729 if ok { 729 730 seqPtr = &seq … … 736 737 737 738 consumer, ch := net.ring.NewConsumer(seqPtr) 739 740 if _, ok := dc.ringConsumers[net]; ok { 741 panic("network has been added twice") 742 } 743 dc.ringConsumers[net] = consumer 744 738 745 go func() { 739 for { 740 var closed bool 741 select { 742 case _, ok := <-ch: 743 if !ok { 744 closed = true 746 for range ch { 747 uc := net.upstream() 748 if uc == nil { 749 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 750 continue 751 } 752 753 for { 754 msg := consumer.Peek() 755 if msg == nil { 745 756 break 746 757 } 747 758 748 uc := net.upstream() 749 if uc == nil { 750 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 751 break 759 dc.lock.Lock() 760 _, ours := dc.ourMessages[msg] 761 delete(dc.ourMessages, msg) 762 dc.lock.Unlock() 763 if ours { 764 // The message comes from our connection, don't echo it 765 // back 766 consumer.Consume() 767 continue 752 768 } 753 769 754 for { 755 msg := consumer.Peek() 756 if msg == nil { 757 break 758 } 759 760 dc.lock.Lock() 761 _, ours := dc.ourMessages[msg] 762 delete(dc.ourMessages, msg) 763 dc.lock.Unlock() 764 if ours { 765 // The message comes from our connection, don't echo it 766 // back 767 consumer.Consume() 768 continue 769 } 770 771 msg = msg.Copy() 772 switch msg.Command { 773 case "PRIVMSG": 774 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) 775 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) 776 default: 777 panic("expected to consume a PRIVMSG message") 778 } 779 780 if !msgTagsEnabled { 781 for name := range msg.Tags { 782 supported := false 783 switch name { 784 case "time": 785 supported = serverTimeEnabled 786 } 787 if !supported { 788 delete(msg.Tags, name) 789 } 770 msg = msg.Copy() 771 switch msg.Command { 772 case "PRIVMSG": 773 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) 774 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) 775 default: 776 panic("expected to consume a PRIVMSG message") 777 } 778 779 if !msgTagsEnabled { 780 for name := range msg.Tags { 781 supported := false 782 switch name { 783 case "time": 784 supported = serverTimeEnabled 785 } 786 if !supported { 787 delete(msg.Tags, name) 790 788 } 791 789 } 792 793 dc.SendMessage(msg)794 consumer.Consume()795 790 } 796 case <-dc.closed: 797 closed = true 798 } 799 if closed { 800 break 801 } 802 } 803 804 // TODO: close the consumer from the user goroutine, so we don't need 805 // that net.history lock 806 seq := consumer.Close() 807 808 net.lock.Lock() 809 net.history[dc.clientName] = seq 810 net.lock.Unlock() 791 792 dc.SendMessage(msg) 793 consumer.Consume() 794 } 795 } 811 796 }() 812 797 } -
trunk/user.go
r203 r204 42 42 stopped chan struct{} 43 43 44 lock sync.Mutex45 conn *upstreamConn46 44 history map[string]uint64 45 46 lock sync.Mutex 47 conn *upstreamConn 47 48 } 48 49 … … 236 237 case eventDownstreamDisconnected: 237 238 dc := e.dc 239 240 for net, rc := range dc.ringConsumers { 241 seq := rc.Close() 242 net.history[dc.clientName] = seq 243 } 244 238 245 for i := range u.downstreamConns { 239 246 if u.downstreamConns[i] == dc {
Note:
See TracChangeset
for help on using the changeset viewer.