- Timestamp:
- Mar 25, 2020, 10:28:25 AM (5 years ago)
- Location:
- trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r143 r144 703 703 704 704 dc.forEachNetwork(func(net *network) { 705 historyName := dc.rawUsername 706 707 // TODO: need to take dc.network into account here 708 var seqPtr *uint64 709 if firstDownstream { 710 net.lock.Lock() 711 seq, ok := net.history[historyName] 712 net.lock.Unlock() 713 if ok { 714 seqPtr = &seq 715 } 716 } 717 718 // TODO: we need to create a consumer when adding networks on-the-fly 719 consumer, ch := net.ring.NewConsumer(seqPtr) 720 go func() { 721 for { 722 var closed bool 723 select { 724 case <-ch: 725 uc := net.upstream() 726 if uc == nil { 727 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 728 break 729 } 730 dc.ringMessages <- ringMessage{consumer, uc} 731 case <-dc.closed: 732 closed = true 733 } 734 if closed { 705 // TODO: need to take dc.network into account when deciding whether or 706 // not to load history 707 dc.runNetwork(net, firstDownstream) 708 }) 709 710 return nil 711 } 712 713 // runNetwork starts listening for messages coming from the network's ring 714 // buffer. 715 // 716 // It panics if the network is not suitable for the downstream connection. 717 func (dc *downstreamConn) runNetwork(net *network, loadHistory bool) { 718 if dc.network != nil && net != dc.network { 719 panic("network not suitable for downstream connection") 720 } 721 722 historyName := dc.rawUsername 723 724 var seqPtr *uint64 725 if loadHistory { 726 net.lock.Lock() 727 seq, ok := net.history[historyName] 728 net.lock.Unlock() 729 if ok { 730 seqPtr = &seq 731 } 732 } 733 734 consumer, ch := net.ring.NewConsumer(seqPtr) 735 go func() { 736 for { 737 var closed bool 738 select { 739 case <-ch: 740 uc := net.upstream() 741 if uc == nil { 742 dc.logger.Printf("ignoring messages for upstream %q: upstream is disconnected", net.Addr) 735 743 break 736 744 } 737 } 738 739 seq := consumer.Close() 740 741 // TODO: need to take dc.network into account here 742 dc.user.lock.Lock() 743 lastDownstream := len(dc.user.downstreamConns) == 0 744 dc.user.lock.Unlock() 745 746 if lastDownstream { 747 net.lock.Lock() 748 net.history[historyName] = seq 749 net.lock.Unlock() 750 } 751 }() 752 }) 753 754 return nil 745 dc.ringMessages <- ringMessage{consumer, uc} 746 case <-dc.closed: 747 closed = true 748 } 749 if closed { 750 break 751 } 752 } 753 754 seq := consumer.Close() 755 756 // TODO: need to take dc.network into account here 757 dc.user.lock.Lock() 758 lastDownstream := len(dc.user.downstreamConns) == 0 759 dc.user.lock.Unlock() 760 761 if lastDownstream { 762 net.lock.Lock() 763 net.history[historyName] = seq 764 net.lock.Unlock() 765 } 766 }() 755 767 } 756 768 -
trunk/user.go
r143 r144 199 199 200 200 func (u *user) createNetwork(net *Network) (*network, error) { 201 if net.ID != 0 { 202 panic("tried creating an already-existing network") 203 } 204 201 205 network := newNetwork(u, net) 202 206 err := u.srv.db.StoreNetwork(u.Username, &network.Network) … … 204 208 return nil, err 205 209 } 210 211 u.forEachDownstream(func(dc *downstreamConn) { 212 if dc.network == nil { 213 dc.runNetwork(network, false) 214 } 215 }) 216 206 217 u.lock.Lock() 207 218 u.networks = append(u.networks, network) 208 219 u.lock.Unlock() 220 209 221 go network.run() 210 222 return network, nil
Note:
See TracChangeset
for help on using the changeset viewer.