Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
9a1c92f
Create a stream to track quarantine state changes
turt2live Mar 12, 2026
c3a0e4a
Insert into quarantined media stream upon changes
turt2live Mar 13, 2026
7386f04
Add admin API to access stream data
turt2live Mar 13, 2026
0f782ce
Add background update to insert existing rows
turt2live Mar 16, 2026
f1a35fa
changelog
turt2live Mar 16, 2026
780f13a
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Mar 16, 2026
a78c03c
fix out of bounds on max
turt2live Mar 16, 2026
7963593
Attempt to fix linting
turt2live Mar 16, 2026
d77a76d
bump for ci
turt2live Mar 16, 2026
2ef406e
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Mar 18, 2026
86a6a15
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Mar 30, 2026
62261e0
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Mar 31, 2026
d743711
Add comments
turt2live Mar 31, 2026
81ce22c
Use current token for stream, requiring writer configuration
turt2live Mar 31, 2026
282e670
Add extra safety
turt2live Mar 31, 2026
616e9c8
Split and expand tests
turt2live Mar 31, 2026
914e252
Attempt to fix linting
turt2live Mar 31, 2026
090e220
bump ci
turt2live Mar 31, 2026
cb71d46
Move schema deltas
turt2live Mar 31, 2026
07ec8f4
Fix tests
turt2live Mar 31, 2026
d288ef5
Apply changes from review comments
turt2live Mar 31, 2026
471b3dc
Attempt to fix linting
turt2live Mar 31, 2026
b5eafbc
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Mar 31, 2026
373ed83
Record quarantine changes in more sites
turt2live Apr 1, 2026
e5791a3
spelling
turt2live Apr 1, 2026
067f659
Apply suggestions from code review
turt2live Apr 1, 2026
99b4bf2
Attempt to fix linting
turt2live Apr 1, 2026
5a4ac32
bump ci
turt2live Apr 1, 2026
5eac826
Use `Token` type
turt2live Apr 1, 2026
b755aae
Add more token stuff
turt2live Apr 1, 2026
c157697
split tests, again
turt2live Apr 1, 2026
dbc3445
Use multi stream tokens to fix types?
turt2live Apr 1, 2026
6688723
Attempt to fix linting
turt2live Apr 1, 2026
237f0a6
Revert "Attempt to fix linting"
turt2live Apr 1, 2026
190bda8
Revert "Use multi stream tokens to fix types?"
turt2live Apr 1, 2026
ef63a72
Partial revert "Add more token stuff"
turt2live Apr 1, 2026
519512c
we do need this though
turt2live Apr 1, 2026
c36311a
Revert "we do need this though"
turt2live Apr 1, 2026
8d6d9c8
Revert "Add more token stuff"
turt2live Apr 1, 2026
2de3744
Move stream wait to servlet I guess
turt2live Apr 1, 2026
2325d9b
Remove excess change from lint fixing action
turt2live Apr 1, 2026
2558b61
API changes
turt2live Apr 2, 2026
7469b49
Change background update
turt2live Apr 2, 2026
8255d7b
Attempt to fix linting
turt2live Apr 2, 2026
455f749
*ahem*
turt2live Apr 2, 2026
cbc6ec3
1
turt2live Apr 2, 2026
79b7a8a
Back out changes which try to capture additional sites
turt2live Apr 6, 2026
6006bc1
Apply suggestions from code review
turt2live Apr 6, 2026
f4f7369
Merge remote-tracking branch 'origin/travis/list-quarantined-media-mk…
turt2live Apr 6, 2026
4a7f8fa
Fix from code review
turt2live Apr 6, 2026
bf589b7
More code review changes
turt2live Apr 6, 2026
61ed17f
Add background update test case
turt2live Apr 6, 2026
1a5d3b8
Attempt to fix linting
turt2live Apr 6, 2026
0851400
incorporate hidden suggestion
turt2live Apr 6, 2026
44ddb62
define store
turt2live Apr 6, 2026
3c1f0ea
remove unused var
turt2live Apr 6, 2026
da290fa
Attempt to fix linting
turt2live Apr 6, 2026
35ac9dc
bump ci
turt2live Apr 6, 2026
b4e3755
fix
turt2live Apr 6, 2026
f4bbc99
Attempt to fix linting
turt2live Apr 6, 2026
d17a8f2
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Apr 6, 2026
af05ff3
Different base I guess
turt2live Apr 6, 2026
8c34804
login
turt2live Apr 6, 2026
78beb96
Add sequence to portdb setup
turt2live Apr 8, 2026
53752b0
docs and test changes
turt2live Apr 8, 2026
0d086a4
Attempt to fix linting
turt2live Apr 8, 2026
22d8370
Merge branch 'develop' into travis/list-quarantined-media-mk2
turt2live Apr 8, 2026
1e94897
E_OFF_BY_1
turt2live Apr 8, 2026
9f03d60
Fix docs
turt2live Apr 8, 2026
045dfec
Apply suggestions from code review
turt2live Apr 8, 2026
af5dd7a
address code review again
turt2live Apr 8, 2026
c78e67f
Attempt to fix linting
turt2live Apr 8, 2026
5269e19
bump ci
turt2live Apr 8, 2026
62c533a
Update synapse/rest/admin/media.py
turt2live Apr 9, 2026
083c00f
Update tests/storage/test_room.py
turt2live Apr 9, 2026
51f9f0e
Give more friendly feedback to callers who try to get future data
turt2live Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19558.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a ["Listing quarantined media changes" Admin API](https://element-hq.github.io/synapse/latest/admin_api/media_admin_api.html#listing-quarantined-media-changes) for retrieving a paginated record of when media became (un)quarantined.
Comment thread
turt2live marked this conversation as resolved.
Comment thread
MadLittleMods marked this conversation as resolved.
2 changes: 1 addition & 1 deletion docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
},
"media_repository": {
"app": "synapse.app.generic_worker",
"listener_resources": ["media", "client"],
"listener_resources": ["media", "client", "replication"],
"endpoint_patterns": [
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
Expand Down
36 changes: 36 additions & 0 deletions docs/admin_api/media_admin_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,42 @@ Response:
{}
```

## Listing quarantined media changes

When media is quarantined or unquarantined, a change record is created in the
database. This API returns those change records in the order they were created.
Comment thread
turt2live marked this conversation as resolved.
Comment thread
turt2live marked this conversation as resolved.

**Note**: This API should be considered *best-effort* and expected to have missing or
duplicate records. Currently, this only captures any media explicitly (un)quarantined by
the media quarantine admin API, and the other cases are tracked by
https://github.com/element-hq/synapse/issues/19672. Historical media uploaded before
Synapse 1.152.0 is backfilled in a background update on a best-effort basis.

Each page has a maximum of 100 records. The first page has the oldest records,
paginating forwards with each `next_batch` value.

Request:

```
GET /_synapse/admin/v1/media/quarantine_changes?from=2
Comment thread
turt2live marked this conversation as resolved.
```

Where `from` is the `next_batch` value from a previous request. It is optional
and defaults to the first page (the value `0`).

Response:

```json
{
"next_batch": 4,
"changes": [
{ "origin": "example.org", "media_id": "abcdefg12345...", "quarantined": true },
{ "origin": "example.org", "media_id": "abcdefg12345...", "quarantined": false },
{ "origin": "another.example.org", "media_id": "abcdefg12345...", "quarantined": true }
Comment thread
turt2live marked this conversation as resolved.
]
}
```

# Delete local media
This API deletes the *local* media from the disk of your own server.
This includes any local thumbnails and copies of media downloaded from
Expand Down
8 changes: 8 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,14 @@ configured as stream writer for the `device_lists` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$

##### The `quarantined_media_changes` stream

The `quarantined_media_changes` stream supports multiple writers. The following endpoints
can be handled by any worker, but should be routed directly to one of the workers
configured as stream writer for the `quarantined_media_changes` stream:

^/_synapse/admin/v1/quarantine_media/.*$
Comment thread
turt2live marked this conversation as resolved.

#### Restrict outbound federation traffic to a specific set of workers

The
Expand Down
5 changes: 5 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Comment thread
turt2live marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"users": ["shadow_banned", "approved", "locked", "suspended"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
"users_who_share_rooms": ["share_private"],
"quarantined_media_changes": ["quarantined"],
}


Expand Down Expand Up @@ -912,6 +913,10 @@ def alter_table(txn: LoggingTransaction) -> None:
await self._setup_autoincrement_sequence(
"state_groups_pending_deletion", "sequence_number"
)
await self._setup_sequence(
"quarantined_media_id_seq",
[("quarantined_media_changes", "stream_id")],
)

# Step 3. Get tables.
self.progress.set_state("Fetching tables")
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ class WriterLocations:
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
device_lists: The instances that write to the device list stream.
quarantined_media_changes: The instances that write to the quarantined media
changes stream.
"""

events: list[str] = attr.ib(
Expand Down Expand Up @@ -180,6 +182,10 @@ class WriterLocations:
default=["master"],
converter=_instance_to_list_converter,
)
quarantined_media_changes: list[str] = attr.ib(
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
Comment thread
turt2live marked this conversation as resolved.


@attr.s(auto_attribs=True)
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
PresenceStream,
PushersStream,
PushRulesStream,
QuarantinedMediaStream,
ReceiptsStream,
StickyEventsStream,
Stream,
Expand Down Expand Up @@ -73,6 +74,7 @@
ThreadSubscriptionsStream,
UnPartialStatedRoomStream,
UnPartialStatedEventStream,
QuarantinedMediaStream,
)
}

Expand All @@ -96,4 +98,5 @@
"ThreadSubscriptionsStream",
"UnPartialStatedRoomStream",
"UnPartialStatedEventStream",
"QuarantinedMediaStream",
]
47 changes: 47 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,3 +808,50 @@ async def _update_function(
return [], to_token, False

return rows, rows[-1][0], len(updates) == limit


@attr.s(slots=True, auto_attribs=True)
class QuarantinedMediaStreamRow:
"""Row for QuarantinedMediaStream"""

# We store the origin and media_id as media is scoped to the origin and are uniquely
# identified by (origin, media_id).

origin: str
media_id: str
Comment thread
turt2live marked this conversation as resolved.
quarantined: bool


class QuarantinedMediaStream(_StreamFromIdGen):
"""Stream to track changes to (un)quarantined media."""

NAME = "quarantined_media"
ROW_TYPE = QuarantinedMediaStreamRow

def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
self._update_function,
self.store._quarantined_media_changes_id_gen,
)

async def _update_function(
self, instance_name: str, from_token: Token, to_token: Token, limit: int
) -> StreamUpdateResult:
updates = await self.store.get_quarantined_media_changes(
from_id=from_token, to_id=to_token, limit=limit
)
rows = [
(
update.stream_id,
# Args to `QuarantinedMediaStreamRow`
(update.origin, update.media_id, update.quarantined),
)
for update in updates
]

if not rows:
return [], to_token, False

return rows, rows[-1][0], len(updates) == limit
65 changes: 65 additions & 0 deletions synapse/rest/admin/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,70 @@ async def on_POST(
return HTTPStatus.OK, {}


class ListQuarantineChanges(RestServlet):
"""Lists the quarantine changes to media.

Uses the pagination format described by https://spec.matrix.org/v1.18/appendices/#pagination
"""

PATTERNS = admin_patterns("/media/quarantine_changes$")

def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.auth = hs.get_auth()
self.server_name = hs.hostname
self.replication = hs.get_replication_data_handler()

async def on_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)

from_id = parse_integer(request, "from", default=0)
Comment thread
MadLittleMods marked this conversation as resolved.
Comment thread
turt2live marked this conversation as resolved.
limit = 100 # arbitrary; not enough to cause problems (hopefully)
Comment thread
MadLittleMods marked this conversation as resolved.
to_id = await self.store.get_current_quarantined_media_stream_id()

if to_id < from_id:
# The caller is trying to get future data, which isn't possible.
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"The `from` position is ahead of the currently persisted position.",
errcode=Codes.INVALID_PARAM,
)

# We need to wait to ensure that our current worker is actually caught up with
# the stream position, otherwise we might not return what we think we're returning.
if not await self.store.wait_for_quarantined_media_stream_id(from_id):
raise SynapseError(
HTTPStatus.INTERNAL_SERVER_ERROR,
"Timed out while waiting for the worker serving this request to catch up to the given "
"`from` stream position. Assuming this is a valid `from` token, this indicates an issue "
"with Synapse or the worker deployment lagging behind the replication stream. Please try "
"the request again later.",
errcode=Codes.UNKNOWN,
)
Copy link
Copy Markdown
Contributor

@MadLittleMods MadLittleMods Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we'd also validate from_id before waiting and throw M_INVALID_PARAM. This way people see a more sane error instead of 500 {'errcode': 'M_UNKNOWN', 'error': 'Internal server error'} from the assertion in wait_for_quarantined_media_stream_id(...).

This kind of thing is mentioned in #19644 "tokens should be validated before it reaches this point."

There isn't a helper for this so it would be a similar if from_id < max_persisted_position: error check.

Since this is an admin endpoint we could forgo this but it would be nice to have a good example in the codebase.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be covered in 51f9f0e - let me know if changes are needed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(will follow up in a subsequent PR if required)

Copy link
Copy Markdown
Contributor

@MadLittleMods MadLittleMods Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if to_id < from_id: error check that was added in 51f9f0e is a bit flawed since to_id is the current token of the current worker (which could be behind the other workers). (and it's okay for to_id == from_id as that would mean no activity has happened)

I think we want to compare to get_max_allocated_token() which actually looks at the database source of truth across all of the workers.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd then need to wait for that token too, I believe?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't - if we're caught up on from then we might return fewer results, but that's fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


changes = await self.store.get_quarantined_media_changes(
from_id=from_id,
to_id=to_id,
limit=limit,
)

serialized_changes = [
{
"origin": c.origin if c.origin is not None else self.server_name,
"media_id": c.media_id,
"quarantined": c.quarantined,
}
for c in changes
]

# We know the last record will have the highest stream ID, so use that one. If
# there aren't any records, just return the `to_id` value because it'll be the
# furthest stream position possible.
next_batch = changes[-1].stream_id if len(changes) > 0 else to_id

return HTTPStatus.OK, {"next_batch": next_batch, "changes": serialized_changes}


class ProtectMediaByID(RestServlet):
"""Protect local media from being quarantined."""

Expand Down Expand Up @@ -529,6 +593,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server: HttpServer)
QuarantineMediaByID(hs).register(http_server)
UnquarantineMediaByID(hs).register(http_server)
QuarantineMediaByUser(hs).register(http_server)
ListQuarantineChanges(hs).register(http_server)
ProtectMediaByID(hs).register(http_server)
UnprotectMediaByID(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
Expand Down
Loading
Loading