Skip to content

Commit

Permalink
Fix bug where task cache storage is misconfigured (#15433)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Sep 19, 2024
1 parent 54f6d06 commit e248442
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def update_for_task(self: Self, task: "Task") -> Self:
storage = task.cache_policy.key_storage
if isinstance(storage, str) and not len(storage.split("/")) == 2:
storage = Path(storage)
update["result_storage"] = await resolve_result_storage(storage)
update["metadata_storage"] = await resolve_result_storage(storage)
if task.cache_policy.lock_manager is not None:
update["lock_manager"] = task.cache_policy.lock_manager

Expand Down
11 changes: 11 additions & 0 deletions tests/results/test_task_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from prefect.cache_policies import Inputs
from prefect.filesystems import LocalFileSystem
from prefect.flows import flow
from prefect.results import get_result_store
Expand Down Expand Up @@ -396,6 +397,16 @@ def bar():
await api_state.result()


async def test_result_store_correctly_receives_metadata_storage(tmp_path):
@task(persist_result=True, cache_policy=Inputs().configure(key_storage=tmp_path))
def bar():
return get_result_store()

result_store = bar()
assert result_store.metadata_storage == LocalFileSystem(basepath=tmp_path)
assert result_store.result_storage != result_store.metadata_storage


@pytest.mark.parametrize("empty_type", [dict, list])
@pytest.mark.parametrize("persist_result", [True, False])
def test_task_empty_result_is_retained(persist_result, empty_type):
Expand Down
7 changes: 7 additions & 0 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import datetime
import inspect
import json
import time
from asyncio import Event, sleep
from functools import partial
Expand Down Expand Up @@ -45,6 +46,7 @@
from prefect.server import models
from prefect.settings import (
PREFECT_DEBUG_MODE,
PREFECT_LOCAL_STORAGE_PATH,
PREFECT_TASK_DEFAULT_RETRIES,
PREFECT_TASKS_REFRESH_CACHE,
PREFECT_UI_URL,
Expand Down Expand Up @@ -1919,7 +1921,12 @@ def foo(x):
return x

foo(1)
# make sure cache key file and result file are both created
assert (tmp_path / expected_cache_key).exists()
assert "prefect_version" in json.loads(
(tmp_path / expected_cache_key).read_text()
)
assert (PREFECT_LOCAL_STORAGE_PATH.value() / expected_cache_key).exists()

@pytest.mark.parametrize(
"isolation_level", [IsolationLevel.SERIALIZABLE, "SERIALIZABLE"]
Expand Down

0 comments on commit e248442

Please sign in to comment.