Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make the media /upload tracing less ambiguous #15888

Merged
merged 5 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15888.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add tracing to media `/upload` code paths.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

78 changes: 48 additions & 30 deletions synapse/media/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

from synapse.api.errors import NotFoundError
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer

Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
self.clock = hs.get_clock()

@trace
@trace_with_opname("MediaStorage.store_file")
async def store_file(self, source: IO, file_info: FileInfo) -> str:
"""Write `source` to the on disk media store, and also any other
configured storage providers
Expand All @@ -91,18 +91,19 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str:
"""

with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
# Write to the main media repository
await self.write_to_file(source, f)
# Write to the other storage providers
await finish_cb()

return fname

@trace
@trace_with_opname("MediaStorage.write_to_file")
async def write_to_file(self, source: IO, output: IO) -> None:
"""Asynchronously write the `source` to `output`."""
await defer_to_thread(self.reactor, _write_file_synchronously, source, output)

@trace
@trace_with_opname("MediaStorage.store_into_file")
@contextlib.contextmanager
def store_into_file(
self, file_info: FileInfo
Expand All @@ -117,9 +118,9 @@ def store_into_file(
fname can be used to read the contents from after upload, e.g. to
generate thumbnails.

finish_cb must be called and waited on after the file has been
successfully been written to. Should not be called if there was an
error.
finish_cb must be called and waited on after the file has been successfully been
written to. Should not be called if there was an error. Checks for spam and
stores the file into the configured storage providers.

Args:
file_info: Info about the file to store
Expand All @@ -139,43 +140,60 @@ def store_into_file(

finished_called = [False]

main_media_repo_write_trace_scope = start_active_span(
"writing to main media repo"
)
main_media_repo_write_trace_scope.__enter__()

try:
with open(fname, "wb") as f:

async def finish() -> None:
# Ensure that all writes have been flushed and close the
# file.
f.flush()
f.close()

spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
ReadableFileWrapper(self.clock, fname), file_info
)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking media due to spam checker")
# Note that we'll delete the stored media, due to the
# try/except below. The media also won't be stored in
# the DB.
# We currently ignore any additional field returned by
# the spam-check API.
raise SpamMediaException(errcode=spam_check[0])

for provider in self.storage_providers:
await provider.store_file(path, file_info)

finished_called[0] = True
# When someone calls finish, we assume they are done writing to the main media repo
main_media_repo_write_trace_scope.__exit__(None, None, None)

with start_active_span("writing to other storage providers"):
# Ensure that all writes have been flushed and close the
# file.
f.flush()
f.close()

spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
ReadableFileWrapper(self.clock, fname), file_info
)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking media due to spam checker")
# Note that we'll delete the stored media, due to the
# try/except below. The media also won't be stored in
# the DB.
# We currently ignore any additional field returned by
# the spam-check API.
raise SpamMediaException(errcode=spam_check[0])

for provider in self.storage_providers:
with start_active_span(str(provider)):
await provider.store_file(path, file_info)

finished_called[0] = True

yield f, fname, finish
except Exception as e:
try:
main_media_repo_write_trace_scope.__exit__(
type(e), None, e.__traceback__
)
os.remove(fname)
except Exception:
pass

raise e from None

if not finished_called:
raise Exception("Finished callback not called")
exc = Exception("Finished callback not called")
main_media_repo_write_trace_scope.__exit__(
type(exc), None, exc.__traceback__
)
raise exc

async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
"""Attempts to fetch media described by file_info from the local cache
Expand Down
25 changes: 13 additions & 12 deletions synapse/media/storage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from synapse.config._base import Config
from synapse.logging.context import defer_to_thread, run_in_background
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import start_active_span, trace_with_opname
from synapse.util.async_helpers import maybe_awaitable

from ._base import FileInfo, Responder
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
def __str__(self) -> str:
return "StorageProviderWrapper[%s]" % (self.backend,)

@trace
@trace_with_opname("StorageProviderWrapper.store_file")
async def store_file(self, path: str, file_info: FileInfo) -> None:
if not file_info.server_name and not self.store_local:
return None
Expand Down Expand Up @@ -116,7 +116,7 @@ async def store() -> None:

run_in_background(store)

@trace
@trace_with_opname("StorageProviderWrapper.fetch")
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
if file_info.url_cache:
# Files in the URL preview cache definitely aren't stored here,
Expand Down Expand Up @@ -144,7 +144,7 @@ def __init__(self, hs: "HomeServer", config: str):
def __str__(self) -> str:
return "FileStorageProviderBackend[%s]" % (self.base_directory,)

@trace
@trace_with_opname("FileStorageProviderBackend.store_file")
async def store_file(self, path: str, file_info: FileInfo) -> None:
"""See StorageProvider.store_file"""

Expand All @@ -156,14 +156,15 @@ async def store_file(self, path: str, file_info: FileInfo) -> None:

# mypy needs help inferring the type of the second parameter, which is generic
shutil_copyfile: Callable[[str, str], str] = shutil.copyfile
await defer_to_thread(
self.hs.get_reactor(),
shutil_copyfile,
primary_fname,
backup_fname,
)

@trace
with start_active_span("shutil_copyfile"):
await defer_to_thread(
self.hs.get_reactor(),
shutil_copyfile,
primary_fname,
backup_fname,
)

@trace_with_opname("FileStorageProviderBackend.fetch")
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
"""See StorageProvider.fetch"""

Expand Down