Changeset 143 in code for trunk/downstream.go
- Timestamp:
- Mar 25, 2020, 9:53:08 AM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r142 r143 700 700 } 701 701 } 702 702 }) 703 704 dc.forEachNetwork(func(net *network) { 703 705 historyName := dc.rawUsername 704 706 … … 706 708 var seqPtr *uint64 707 709 if firstDownstream { 708 uc.network.lock.Lock()709 seq, ok := uc.network.history[historyName]710 uc.network.lock.Unlock()710 net.lock.Lock() 711 seq, ok := net.history[historyName] 712 net.lock.Unlock() 711 713 if ok { 712 714 seqPtr = &seq … … 715 717 716 718 // TODO: we need to create a consumer when adding networks on-the-fly 717 consumer, ch := uc.ring.NewConsumer(seqPtr)719 consumer, ch := net.ring.NewConsumer(seqPtr) 718 720 go func() { 719 721 for { … … 721 723 select { 722 724 case <-ch: 725 uc := net.upstream() 726 if uc == nil { 727 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 728 break 729 } 723 730 dc.ringMessages <- ringMessage{consumer, uc} 724 731 case <-dc.closed: … … 738 745 739 746 if lastDownstream { 740 uc.network.lock.Lock()741 uc.network.history[historyName] = seq742 uc.network.lock.Unlock()747 net.lock.Lock() 748 net.history[historyName] = seq 749 net.lock.Unlock() 743 750 } 744 751 }() … … 1087 1094 dc.lock.Unlock() 1088 1095 1089 uc. ring.Produce(echoMsg)1096 uc.network.ring.Produce(echoMsg) 1090 1097 } 1091 1098 default:
Note:
See TracChangeset
for help on using the changeset viewer.