Changeset 781 in code for trunk/downstream.go


Ignore:
Timestamp:
Feb 11, 2022, 6:41:46 PM (3 years ago)
Author:
delthas
Message:

Add support for the wip soju.im/read capability and READ command

READ lets downstream clients share information between each other about
what messages have been read by other downstreams.

Each target/entity has an optional corresponding read receipt, which is
stored as a timestamp.

  • When a downstream sends: READ #chan timestamp=2020-01-01T01:23:45.000Z the read receipt for that target is set to that date
  • soju sends READ to downstreams:
    • on JOIN, if the client uses the soju.im/read capability
    • when the read receipt timestamp is set by any downstream

The read receipt date is clamped by the previous receipt date and the
current time.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/downstream.go

    r780 r781  
    226226        "soju.im/bouncer-networks":        "",
    227227        "soju.im/bouncer-networks-notify": "",
     228        "soju.im/read":                    "",
    228229}
    229230
     
    554555                return
    555556        }
     557        if msg.Command == "READ" && !dc.caps["soju.im/read"] {
     558                return
     559        }
    556560
    557561        dc.srv.metrics.downstreamOutMessagesTotal.Inc()
     
    14701474                        })
    14711475
    1472                         forwardChannel(dc, ch)
     1476                        forwardChannel(ctx, dc, ch)
    14731477                }
    14741478        })
     
    18181822                                        ch.Key = key
    18191823                                }
    1820                                 uc.network.attach(ch)
     1824                                uc.network.attach(ctx, ch)
    18211825                        } else {
    18221826                                ch = &Channel{
     
    27512755                        }
    27522756                })
     2757        case "READ":
     2758                var target, criteria string
     2759                if err := parseMessageParams(msg, &target); err != nil {
     2760                        return ircError{&irc.Message{
     2761                                Command: "FAIL",
     2762                                Params:  []string{"READ", "NEED_MORE_PARAMS", "Missing parameters"},
     2763                        }}
     2764                }
     2765                if len(msg.Params) > 1 {
     2766                        criteria = msg.Params[1]
     2767                }
     2768
     2769                uc, entity, err := dc.unmarshalEntity(target)
     2770                if err != nil {
     2771                        return err
     2772                }
     2773                entityCM := uc.network.casemap(entity)
     2774
     2775                r, err := dc.srv.db.GetReadReceipt(ctx, uc.network.ID, entityCM)
     2776                if err != nil {
     2777                        dc.logger.Printf("failed to get the read receipt for %q: %v", entity, err)
     2778                        return ircError{&irc.Message{
     2779                                Command: "FAIL",
     2780                                Params:  []string{"READ", "INTERNAL_ERROR", target, "Internal error"},
     2781                        }}
     2782                } else if r == nil {
     2783                        r = &ReadReceipt{
     2784                                Target: entityCM,
     2785                        }
     2786                }
     2787
     2788                broadcast := false
     2789                if len(criteria) > 0 {
     2790                        // TODO: support msgid criteria
     2791                        criteriaParts := strings.SplitN(criteria, "=", 2)
     2792                        if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" {
     2793                                return ircError{&irc.Message{
     2794                                        Command: "FAIL",
     2795                                        Params:  []string{"READ", "INVALID_PARAMS", criteria, "Unknown criteria"},
     2796                                }}
     2797                        }
     2798
     2799                        timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1])
     2800                        if err != nil {
     2801                                return ircError{&irc.Message{
     2802                                        Command: "FAIL",
     2803                                        Params:  []string{"READ", "INVALID_PARAMS", criteria, "Invalid criteria"},
     2804                                }}
     2805                        }
     2806                        now := time.Now()
     2807                        if timestamp.After(now) {
     2808                                timestamp = now
     2809                        }
     2810                        if r.Timestamp.Before(timestamp) {
     2811                                r.Timestamp = timestamp
     2812                                if err := dc.srv.db.StoreReadReceipt(ctx, uc.network.ID, r); err != nil {
     2813                                        dc.logger.Printf("failed to store receipt for %q: %v", entity, err)
     2814                                        return ircError{&irc.Message{
     2815                                                Command: "FAIL",
     2816                                                Params:  []string{"READ", "INTERNAL_ERROR", target, "Internal error"},
     2817                                        }}
     2818                                }
     2819                                broadcast = true
     2820                        }
     2821                }
     2822
     2823                timestampStr := "*"
     2824                if !r.Timestamp.IsZero() {
     2825                        timestampStr = fmt.Sprintf("timestamp=%s", r.Timestamp.UTC().Format(serverTimeLayout))
     2826                }
     2827                uc.forEachDownstream(func(d *downstreamConn) {
     2828                        if broadcast || dc.id == d.id {
     2829                                d.SendMessage(&irc.Message{
     2830                                        Prefix:  d.prefix(),
     2831                                        Command: "READ",
     2832                                        Params:  []string{d.marshalEntity(uc.network, entity), timestampStr},
     2833                                })
     2834                        }
     2835                })
    27532836        case "BOUNCER":
    27542837                var subcommand string
Note: See TracChangeset for help on using the changeset viewer.