- Timestamp:
- Apr 6, 2020, 4:05:36 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/doc/architecture.md
r170 r227 32 32 and downstream message handlers are called from this goroutine, thus they can 33 33 safely access both upstream and downstream state. 34 35 In addition to these goroutines, each downstream connection also has one36 goroutine per network to handle new upstream messages coming from the ring37 buffer. -
trunk/downstream.go
r226 r227 232 232 233 233 dc.conn.SendMessage(msg) 234 } 235 236 func (dc *downstreamConn) sendFromUpstream(msg *irc.Message, uc *upstreamConn) { 237 dc.lock.Lock() 238 _, ours := dc.ourMessages[msg] 239 delete(dc.ourMessages, msg) 240 dc.lock.Unlock() 241 if ours && !dc.getCap("echo-message") { 242 // The message comes from our connection, don't echo it 243 // back 244 return 245 } 246 247 msg = msg.Copy() 248 switch msg.Command { 249 case "PRIVMSG", "NOTICE": 250 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix) 251 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0]) 252 default: 253 panic(fmt.Sprintf("unexpected %q message", msg.Command)) 254 } 255 256 dc.SendMessage(msg) 234 257 } 235 258 … … 664 687 665 688 dc.forEachNetwork(func(net *network) { 666 dc.runNetwork(net, sendHistory) 689 var seqPtr *uint64 690 if sendHistory { 691 seq, ok := net.history[dc.clientName] 692 if ok { 693 seqPtr = &seq 694 } 695 } 696 697 consumer, _ := net.ring.NewConsumer(seqPtr) 698 699 if _, ok := dc.ringConsumers[net]; ok { 700 panic("network has been added twice") 701 } 702 dc.ringConsumers[net] = consumer 703 704 // TODO: this means all history is lost when trying to send it while the 705 // upstream is disconnected. We need to store history differently so that 706 // we don't need access to upstreamConn to forward it to a downstream 707 // client. 708 uc := net.upstream() 709 if uc == nil { 710 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 711 return 712 } 713 714 for { 715 msg := consumer.Peek() 716 if msg == nil { 717 break 718 } 719 720 dc.sendFromUpstream(msg, uc) 721 consumer.Consume() 722 } 667 723 }) 668 724 669 725 return nil 670 }671 672 // runNetwork starts listening for messages coming from the network's ring673 // buffer.674 //675 // It panics if the network is not suitable for the downstream connection.676 func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) {677 if dc.network != nil && net != dc.network {678 panic("network not suitable for downstream connection")679 }680 681 var seqPtr *uint64682 if loadHistory {683 seq, ok := net.history[dc.clientName]684 if ok {685 seqPtr = &seq686 }687 }688 689 consumer, ch := net.ring.NewConsumer(seqPtr)690 691 if _, ok := dc.ringConsumers[net]; ok {692 panic("network has been added twice")693 }694 dc.ringConsumers[net] = consumer695 696 go func() {697 for range ch {698 uc := net.upstream()699 if uc == nil {700 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr)701 continue702 }703 704 for {705 msg := consumer.Peek()706 if msg == nil {707 break708 }709 710 dc.lock.Lock()711 _, ours := dc.ourMessages[msg]712 delete(dc.ourMessages, msg)713 dc.lock.Unlock()714 if ours && !dc.getCap("echo-message") {715 // The message comes from our connection, don't echo it716 // back717 consumer.Consume()718 continue719 }720 721 msg = msg.Copy()722 switch msg.Command {723 case "PRIVMSG", "NOTICE":724 msg.Prefix = dc.marshalUserPrefix(uc, msg.Prefix)725 msg.Params[0] = dc.marshalEntity(uc, msg.Params[0])726 default:727 panic(fmt.Sprintf("unexpected %q message", msg.Command))728 }729 730 dc.SendMessage(msg)731 consumer.Consume()732 }733 }734 }()735 726 } 736 727 -
trunk/upstream.go
r226 r227 1366 1366 func (uc *upstreamConn) produce(msg *irc.Message) { 1367 1367 uc.network.ring.Produce(msg) 1368 1369 uc.forEachDownstream(func(dc *downstreamConn) { 1370 dc.sendFromUpstream(dc.ringConsumers[uc.network].Consume(), uc) 1371 }) 1368 1372 } 1369 1373 -
trunk/user.go
r223 r227 352 352 u.forEachDownstream(func(dc *downstreamConn) { 353 353 if dc.network == nil { 354 dc.runNetwork(network, false) 354 consumer, _ := network.ring.NewConsumer(nil) 355 dc.ringConsumers[network] = consumer 355 356 } 356 357 }) … … 376 377 dc.Close() 377 378 } 379 delete(dc.ringConsumers, net) 378 380 }) 379 381
Note:
See TracChangeset
for help on using the changeset viewer.