Changeset 781 in code for trunk/downstream.go
- Timestamp:
- Feb 11, 2022, 6:41:46 PM (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/downstream.go
r780 r781 226 226 "soju.im/bouncer-networks": "", 227 227 "soju.im/bouncer-networks-notify": "", 228 "soju.im/read": "", 228 229 } 229 230 … … 554 555 return 555 556 } 557 if msg.Command == "READ" && !dc.caps["soju.im/read"] { 558 return 559 } 556 560 557 561 dc.srv.metrics.downstreamOutMessagesTotal.Inc() … … 1470 1474 }) 1471 1475 1472 forwardChannel( dc, ch)1476 forwardChannel(ctx, dc, ch) 1473 1477 } 1474 1478 }) … … 1818 1822 ch.Key = key 1819 1823 } 1820 uc.network.attach(c h)1824 uc.network.attach(ctx, ch) 1821 1825 } else { 1822 1826 ch = &Channel{ … … 2751 2755 } 2752 2756 }) 2757 case "READ": 2758 var target, criteria string 2759 if err := parseMessageParams(msg, &target); err != nil { 2760 return ircError{&irc.Message{ 2761 Command: "FAIL", 2762 Params: []string{"READ", "NEED_MORE_PARAMS", "Missing parameters"}, 2763 }} 2764 } 2765 if len(msg.Params) > 1 { 2766 criteria = msg.Params[1] 2767 } 2768 2769 uc, entity, err := dc.unmarshalEntity(target) 2770 if err != nil { 2771 return err 2772 } 2773 entityCM := uc.network.casemap(entity) 2774 2775 r, err := dc.srv.db.GetReadReceipt(ctx, uc.network.ID, entityCM) 2776 if err != nil { 2777 dc.logger.Printf("failed to get the read receipt for %q: %v", entity, err) 2778 return ircError{&irc.Message{ 2779 Command: "FAIL", 2780 Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"}, 2781 }} 2782 } else if r == nil { 2783 r = &ReadReceipt{ 2784 Target: entityCM, 2785 } 2786 } 2787 2788 broadcast := false 2789 if len(criteria) > 0 { 2790 // TODO: support msgid criteria 2791 criteriaParts := strings.SplitN(criteria, "=", 2) 2792 if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" { 2793 return ircError{&irc.Message{ 2794 Command: "FAIL", 2795 Params: []string{"READ", "INVALID_PARAMS", criteria, "Unknown criteria"}, 2796 }} 2797 } 2798 2799 timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1]) 2800 if err != nil { 2801 return ircError{&irc.Message{ 2802 Command: "FAIL", 2803 Params: []string{"READ", "INVALID_PARAMS", criteria, "Invalid criteria"}, 2804 }} 2805 } 2806 now := time.Now() 2807 if timestamp.After(now) { 2808 timestamp = now 2809 } 2810 if r.Timestamp.Before(timestamp) { 2811 r.Timestamp = timestamp 2812 if err := dc.srv.db.StoreReadReceipt(ctx, uc.network.ID, r); err != nil { 2813 dc.logger.Printf("failed to store receipt for %q: %v", entity, err) 2814 return ircError{&irc.Message{ 2815 Command: "FAIL", 2816 Params: []string{"READ", "INTERNAL_ERROR", target, "Internal error"}, 2817 }} 2818 } 2819 broadcast = true 2820 } 2821 } 2822 2823 timestampStr := "*" 2824 if !r.Timestamp.IsZero() { 2825 timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout)) 2826 } 2827 uc.forEachDownstream(func(d *downstreamConn) { 2828 if broadcast || dc.id == d.id { 2829 d.SendMessage(&irc.Message{ 2830 Prefix: d.prefix(), 2831 Command: "READ", 2832 Params: []string{d.marshalEntity(uc.network, entity), timestampStr}, 2833 }) 2834 } 2835 }) 2753 2836 case "BOUNCER": 2754 2837 var subcommand string
Note:
See TracChangeset
for help on using the changeset viewer.