diff --git a/src/prefect/results.py b/src/prefect/results.py index 7f46e7dc6d32..ce91f14c27e0 100644 --- a/src/prefect/results.py +++ b/src/prefect/results.py @@ -245,7 +245,7 @@ async def _call_explicitly_async_block_method( see https://github.com/PrefectHQ/prefect/issues/15008 """ if hasattr(block, f"a{method}"): # explicit async method - return await getattr(block.__class__.__name__, f"a{method}")(*args, **kwargs) + return await getattr(block, f"a{method}")(*args, **kwargs) elif hasattr(getattr(block, method, None), "aio"): # sync_compatible return await getattr(block, method).aio(block, *args, **kwargs) else: # should not happen in prefect, but users can override impls diff --git a/tests/public/results/test_result_storage.py b/tests/public/results/test_result_storage.py new file mode 100644 index 000000000000..67896b2b9c61 --- /dev/null +++ b/tests/public/results/test_result_storage.py @@ -0,0 +1,59 @@ +from pathlib import Path +from unittest.mock import patch + +import anyio +import pytest + +from prefect import flow, task +from prefect.filesystems import LocalFileSystem +from prefect.results import ResultStore + + +@pytest.fixture +def custom_storage_block(tmpdir: Path): + class Test(LocalFileSystem): + _block_type_slug = "test" + + async def awrite_path(self, path: str, content: bytes) -> str: + _path: Path = self._resolve_path(path) + + _path.parent.mkdir(exist_ok=True, parents=True) + + if _path.exists() and not _path.is_file(): + raise ValueError(f"Path {_path} already exists and is not a file.") + + async with await anyio.open_file(_path, mode="wb") as f: + await f.write(content) + return str(_path) + + Test.register_type_and_schema() + test = Test(basepath=str(tmpdir)) + test.save("test", overwrite=True) + return test + + +async def test_async_method_used_in_async_context( + custom_storage_block: LocalFileSystem, +): + # this is a regression test for https://github.com/PrefectHQ/prefect/issues/16486 + with patch.object( + custom_storage_block, "awrite_path", wraps=custom_storage_block.awrite_path + ) as mock_awrite: + + @task(result_storage=custom_storage_block, result_storage_key="testing") + async def t(): + return "this is a test" + + @flow + async def f(): + return await t() + + result = await f() + assert result == "this is a test" + store = ResultStore(result_storage=custom_storage_block) + stored_result_record = await store.aread("testing") + + assert stored_result_record.result == result == "this is a test" + # Verify awrite_path was called + mock_awrite.assert_awaited_once() + assert mock_awrite.await_args[0][0] == "testing" # Check path argument