- Timestamp:
- Apr 10, 2020, 5:22:47 PM (5 years ago)
- Location:
- trunk
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r249 r253 637 637 return err 638 638 } 639 640 // Only send history if we're the first connected client with that name and641 // network642 sendHistory := true643 dc.user.forEachDownstream(func(conn *downstreamConn) {644 if dc.clientName == conn.clientName && dc.network == conn.network {645 sendHistory = false646 }647 })648 639 649 640 dc.SendMessage(&irc.Message{ … … 689 680 690 681 dc.forEachNetwork(func(net *network) { 691 seq, ok := net.history[dc.clientName] 692 if !sendHistory || !ok { 693 return 694 } 695 696 consumer := net.ring.NewConsumer(seq) 682 // Only send history if we're the first connected client with that name 683 // for the network 684 if _, ok := net.offlineClients[dc.clientName]; ok { 685 dc.sendNetworkHistory(net) 686 delete(net.offlineClients, dc.clientName) 687 } 688 }) 689 690 return nil 691 } 692 693 func (dc *downstreamConn) sendNetworkHistory(net *network) { 694 for target, history := range net.history { 695 seq, ok := history.offlineClients[dc.clientName] 696 if !ok { 697 continue 698 } 699 delete(history.offlineClients, dc.clientName) 700 701 // If all clients have received history, no need to keep the 702 // ring buffer around 703 if len(history.offlineClients) == 0 { 704 delete(net.history, target) 705 } 706 707 consumer := history.ring.NewConsumer(seq) 697 708 698 709 // TODO: this means all history is lost when trying to send it while the … … 726 737 dc.SendMessage(dc.marshalMessage(msg, uc)) 727 738 } 728 }) 729 730 return nil 739 } 731 740 } 732 741 -
trunk/upstream.go
r248 r253 557 557 ch.Members[newNick] = membership 558 558 uc.appendLog(ch.Name, msg) 559 uc.appendHistory(ch.Name, msg) 559 560 } 560 561 } 561 562 562 563 if !me { 563 uc.network.ring.Produce(msg)564 564 uc.forEachDownstream(func(dc *downstreamConn) { 565 565 dc.SendMessage(dc.marshalMessage(msg, uc)) … … 663 663 664 664 uc.appendLog(ch.Name, msg) 665 uc.appendHistory(ch.Name, msg) 665 666 } 666 667 } 667 668 668 669 if msg.Prefix.Name != uc.nick { 669 uc.network.ring.Produce(msg)670 670 uc.forEachDownstream(func(dc *downstreamConn) { 671 671 dc.SendMessage(dc.marshalMessage(msg, uc)) … … 1295 1295 } 1296 1296 1297 // appendHistory appends a message to the history. entity can be empty. 1298 func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) { 1299 // If no client is offline, no need to append the message to the buffer 1300 if len(uc.network.offlineClients) == 0 { 1301 return 1302 } 1303 1304 history, ok := uc.network.history[entity] 1305 if !ok { 1306 history = &networkHistory{ 1307 offlineClients: make(map[string]uint64), 1308 ring: NewRing(uc.srv.RingCap), 1309 } 1310 uc.network.history[entity] = history 1311 1312 for clientName, _ := range uc.network.offlineClients { 1313 history.offlineClients[clientName] = 0 1314 } 1315 } 1316 1317 history.ring.Produce(msg) 1318 } 1319 1297 1320 // produce appends a message to the logs, adds it to the history and forwards 1298 1321 // it to connected downstream connections. … … 1305 1328 } 1306 1329 1307 uc. network.ring.Produce(msg)1330 uc.appendHistory(target, msg) 1308 1331 1309 1332 uc.forEachDownstream(func(dc *downstreamConn) { -
trunk/user.go
r252 r253 46 46 } 47 47 48 type networkHistory struct { 49 offlineClients map[string]uint64 // indexed by client name 50 ring *Ring // can be nil if there are no offline clients 51 } 52 48 53 type network struct { 49 54 Network 50 55 user *user 51 ring *Ring52 56 stopped chan struct{} 53 57 54 conn *upstreamConn 55 history map[string]uint64 56 lastError error 58 conn *upstreamConn 59 history map[string]*networkHistory // indexed by entity 60 offlineClients map[string]struct{} // indexed by client name 61 lastError error 57 62 } 58 63 59 64 func newNetwork(user *user, record *Network) *network { 60 65 return &network{ 61 Network: *record,62 user: user,63 ring: NewRing(user.srv.RingCap),64 stopped: make(chan struct{}),65 history: make(map[string]uint64),66 Network: *record, 67 user: user, 68 stopped: make(chan struct{}), 69 history: make(map[string]*networkHistory), 70 offlineClients: make(map[string]struct{}), 66 71 } 67 72 } … … 295 300 dc := e.dc 296 301 297 dc.forEachNetwork(func(net *network) {298 seq := net.ring.Cur()299 net.history[dc.clientName] = seq300 })301 302 302 for i := range u.downstreamConns { 303 303 if u.downstreamConns[i] == dc { … … 306 306 } 307 307 } 308 309 // Save history if we're the last client with this name 310 skipHistory := make(map[*network]bool) 311 u.forEachDownstream(func(conn *downstreamConn) { 312 if dc.clientName == conn.clientName { 313 skipHistory[conn.network] = true 314 } 315 }) 316 317 dc.forEachNetwork(func(net *network) { 318 if skipHistory[net] || skipHistory[nil] { 319 return 320 } 321 322 net.offlineClients[dc.clientName] = struct{}{} 323 for _, history := range net.history { 324 history.offlineClients[dc.clientName] = history.ring.Cur() 325 } 326 }) 308 327 309 328 u.forEachUpstream(func(uc *upstreamConn) {
Note:
See TracChangeset
for help on using the changeset viewer.