|
23 | 23 | from six import iteritems, text_type
|
24 | 24 | from six.moves import range
|
25 | 25 |
|
26 |
| -from canonicaljson import json |
| 26 | +from canonicaljson import encode_canonical_json, json |
27 | 27 | from prometheus_client import Counter, Histogram
|
28 | 28 |
|
29 | 29 | from twisted.internet import defer
|
|
33 | 33 | from synapse.api.errors import SynapseError
|
34 | 34 | from synapse.events import EventBase # noqa: F401
|
35 | 35 | from synapse.events.snapshot import EventContext # noqa: F401
|
| 36 | +from synapse.events.utils import prune_event_dict |
36 | 37 | from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
37 | 38 | from synapse.logging.utils import log_function
|
38 | 39 | from synapse.metrics import BucketCollector
|
@@ -262,6 +263,13 @@ def read_forward_extremities():
|
262 | 263 |
|
263 | 264 | hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
|
264 | 265 |
|
| 266 | + def _censor_redactions(): |
| 267 | + return run_as_background_process( |
| 268 | + "_censor_redactions", self._censor_redactions |
| 269 | + ) |
| 270 | + |
| 271 | + hs.get_clock().looping_call(_censor_redactions, 10 * 60 * 1000) |
| 272 | + |
265 | 273 | @defer.inlineCallbacks
|
266 | 274 | def _read_forward_extremities(self):
|
267 | 275 | def fetch(txn):
|
@@ -1548,6 +1556,84 @@ def _store_redaction(self, txn, event):
|
1548 | 1556 | (event.event_id, event.redacts),
|
1549 | 1557 | )
|
1550 | 1558 |
|
| 1559 | + @defer.inlineCallbacks |
| 1560 | + def _censor_redactions(self): |
| 1561 | + """Censors all redactions older than a month that haven't been censored. |
| 1562 | +
|
| 1563 | + By censor we mean update the event_json table with the redacted event. |
| 1564 | +
|
| 1565 | + Returns: |
| 1566 | + Deferred |
| 1567 | + """ |
| 1568 | + |
| 1569 | + if self.stream_ordering_month_ago is None: |
| 1570 | + return |
| 1571 | + |
| 1572 | + max_pos = self.stream_ordering_month_ago |
| 1573 | + |
| 1574 | + # We fetch all redactions that point to an event that we have that has |
| 1575 | + # a stream ordering from over a month ago, that we haven't yet censored |
| 1576 | + # in the DB. |
| 1577 | + sql = """ |
| 1578 | + SELECT er.event_id, redacts FROM redactions |
| 1579 | + INNER JOIN events AS er USING (event_id) |
| 1580 | + INNER JOIN events AS eb ON (er.room_id = eb.room_id AND redacts = eb.event_id) |
| 1581 | + WHERE NOT have_censored |
| 1582 | + AND ? <= er.stream_ordering AND er.stream_ordering <= ? |
| 1583 | + ORDER BY er.stream_ordering ASC |
| 1584 | + LIMIT ? |
| 1585 | + """ |
| 1586 | + |
| 1587 | + rows = yield self._execute( |
| 1588 | + "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 |
| 1589 | + ) |
| 1590 | + |
| 1591 | + updates = [] |
| 1592 | + |
| 1593 | + for redaction_id, event_id in rows: |
| 1594 | + redaction_event = yield self.get_event(redaction_id, allow_none=True) |
| 1595 | + original_event = yield self.get_event( |
| 1596 | + event_id, allow_rejected=True, allow_none=True |
| 1597 | + ) |
| 1598 | + |
| 1599 | + # The SQL above ensures that we have both the redaction and |
| 1600 | + # original event, so if the `get_event` calls return None it |
| 1601 | + # means that the redaction wasn't allowed. Either way we know that |
| 1602 | + # the result won't change so we mark the fact that we've checked. |
| 1603 | + if ( |
| 1604 | + redaction_event |
| 1605 | + and original_event |
| 1606 | + and original_event.internal_metadata.is_redacted() |
| 1607 | + ): |
| 1608 | + # Redaction was allowed |
| 1609 | + pruned_json = encode_canonical_json( |
| 1610 | + prune_event_dict(original_event.get_dict()) |
| 1611 | + ) |
| 1612 | + else: |
| 1613 | + # Redaction wasn't allowed |
| 1614 | + pruned_json = None |
| 1615 | + |
| 1616 | + updates.append((redaction_id, event_id, pruned_json)) |
| 1617 | + |
| 1618 | + def _update_censor_txn(txn): |
| 1619 | + for redaction_id, event_id, pruned_json in updates: |
| 1620 | + if pruned_json: |
| 1621 | + self._simple_update_one_txn( |
| 1622 | + txn, |
| 1623 | + table="event_json", |
| 1624 | + keyvalues={"event_id": event_id}, |
| 1625 | + updatevalues={"json": pruned_json}, |
| 1626 | + ) |
| 1627 | + |
| 1628 | + self._simple_update_one_txn( |
| 1629 | + txn, |
| 1630 | + table="redactions", |
| 1631 | + keyvalues={"event_id": redaction_id}, |
| 1632 | + updatevalues={"have_censored": True}, |
| 1633 | + ) |
| 1634 | + |
| 1635 | + yield self.runInteraction("_update_censor_txn", _update_censor_txn) |
| 1636 | + |
1551 | 1637 | @defer.inlineCallbacks
|
1552 | 1638 | def count_daily_messages(self):
|
1553 | 1639 | """
|
|
0 commit comments