Changeset 227 in code for trunk/downstream.go
- Timestamp:
- Apr 6, 2020, 4:05:36 PM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r226 r227 232 232 233 233 dc.conn.SendMessage(msg) 234 } 235 236 func (dc *downstreamConn) sendFromUpstream(msg *irc.Message, uc *upstreamConn) { 237 dc.lock.Lock() 238 _, ours := dc.ourMessages[msg] 239 delete(dc.ourMessages, msg) 240 dc.lock.Unlock() 241 if ours && !dc.getCap("echo-message") { 242 // The message comes from our connection, don't echo it 243 // back 244 return 245 } 246 247 msg = msg.Copy() 248 switch msg.Command { 249 case "PRIVMSG", "NOTICE": 250 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) 251 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) 252 default: 253 panic(fmt.Sprintf("unexpected %q message", msg.Command)) 254 } 255 256 dc.SendMessage(msg) 234 257 } 235 258 … … 664 687 665 688 dc.forEachNetwork(func(net *network) { 666 dc.runNetwork(net, sendHistory) 689 var seqPtr *uint64 690 if sendHistory { 691 seq, ok := net.history[dc.clientName] 692 if ok { 693 seqPtr = &seq 694 } 695 } 696 697 consumer, _ := net.ring.NewConsumer(seqPtr) 698 699 if _, ok := dc.ringConsumers[net]; ok { 700 panic("network has been added twice") 701 } 702 dc.ringConsumers[net] = consumer 703 704 // TODO: this means all history is lost when trying to send it while the 705 // upstream is disconnected. We need to store history differently so that 706 // we don't need access to upstreamConn to forward it to a downstream 707 // client. 708 uc := net.upstream() 709 if uc == nil { 710 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 711 return 712 } 713 714 for { 715 msg := consumer.Peek() 716 if msg == nil { 717 break 718 } 719 720 dc.sendFromUpstream(msg, uc) 721 consumer.Consume() 722 } 667 723 }) 668 724 669 725 return nil 670 }671 672 // runNetwork starts listening for messages coming from the network's ring673 // buffer.674 //675 // It panics if the network is not suitable for the downstream connection.676 func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {677 if dc.network != nil && net != dc.network {678 panic("network not suitable for downstream connection")679 }680 681 var seqPtr *uint64682 if loadHistory {683 seq, ok := net.history[dc.clientName]684 if ok {685 seqPtr = &seq686 }687 }688 689 consumer, ch := net.ring.NewConsumer(seqPtr)690 691 if _, ok := dc.ringConsumers[net]; ok {692 panic("network has been added twice")693 }694 dc.ringConsumers[net] = consumer695 696 go func() {697 for range ch {698 uc := net.upstream()699 if uc == nil {700 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)701 continue702 }703 704 for {705 msg := consumer.Peek()706 if msg == nil {707 break708 }709 710 dc.lock.Lock()711 _, ours := dc.ourMessages[msg]712 delete(dc.ourMessages, msg)713 dc.lock.Unlock()714 if ours && !dc.getCap("echo-message") {715 // The message comes from our connection, don't echo it716 // back717 consumer.Consume()718 continue719 }720 721 msg = msg.Copy()722 switch msg.Command {723 case "PRIVMSG", "NOTICE":724 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)725 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])726 default:727 panic(fmt.Sprintf("unexpected %q message", msg.Command))728 }729 730 dc.SendMessage(msg)731 consumer.Consume()732 }733 }734 }()735 726 } 736 727
Note:
See TracChangeset
for help on using the changeset viewer.