Skip to content

Commit

Permalink
Adds ability to write result metadata separate from data when using `…
Browse files Browse the repository at this point in the history
…ResultStore` (#15214)
  • Loading branch information
desertaxle authored Sep 4, 2024
1 parent 7ce82cd commit 1f9bed1
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 7 deletions.
97 changes: 90 additions & 7 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,18 @@ def _format_user_supplied_storage_key(key: str) -> str:
class ResultStore(BaseModel):
"""
A utility to generate `Result` types.
Attributes:
result_storage: The storage for result records.
metadata_storage: The storage for result record metadata. If not provided, the metadata will be stored alongside the results.
persist_result: Whether to persist results.
cache_result_in_memory: Whether to cache results in memory.
serializer: The serializer to use for results.
storage_key_fn: The function to generate storage keys.
"""

result_storage: Optional[WritableFileSystem] = Field(default=None)
metadata_storage: Optional[WritableFileSystem] = Field(default=None)
persist_result: bool = Field(default_factory=get_default_persist_setting)
cache_result_in_memory: bool = Field(default=True)
serializer: Serializer = Field(default_factory=get_default_result_serializer)
Expand Down Expand Up @@ -238,6 +247,56 @@ async def update_for_task(self: Self, task: "Task") -> Self:
update["result_storage"] = await get_default_result_storage()
return self.model_copy(update=update)

@sync_compatible
async def _exists(self, key: str) -> bool:
"""
Check if a result record exists in storage.
Args:
key: The key to check for the existence of a result record.
Returns:
bool: True if the result record exists, False otherwise.
"""
if self.metadata_storage is not None:
# TODO: Add an `exists` method to commonly used storage blocks
# so the entire payload doesn't need to be read
try:
metadata_content = await self.metadata_storage.read_path(key)
return metadata_content is not None
except Exception:
return False
else:
try:
content = await self.result_storage.read_path(key)
return content is not None
except Exception:
return False

def exists(self, key: str) -> bool:
"""
Check if a result record exists in storage.
Args:
key: The key to check for the existence of a result record.
Returns:
bool: True if the result record exists, False otherwise.
"""
return self._exists(key=key, _sync=True)

async def aexists(self, key: str) -> bool:
"""
Check if a result record exists in storage.
Args:
key: The key to check for the existence of a result record.
Returns:
bool: True if the result record exists, False otherwise.
"""
return await self._exists(key=key, _sync=False)

@sync_compatible
async def _read(self, key: str) -> "ResultRecord":
"""
Expand All @@ -255,8 +314,19 @@ async def _read(self, key: str) -> "ResultRecord":
if self.result_storage is None:
self.result_storage = await get_default_result_storage()

content = await self.result_storage.read_path(f"{key}")
return ResultRecord.deserialize(content)
if self.metadata_storage is not None:
metadata_content = await self.metadata_storage.read_path(key)
metadata = ResultRecordMetadata.load_bytes(metadata_content)
assert (
metadata.storage_key is not None
), "Did not find storage key in metadata"
result_content = await self.result_storage.read_path(metadata.storage_key)
return ResultRecord.deserialize_from_result_and_metadata(
result=result_content, metadata=metadata_content
)
else:
content = await self.result_storage.read_path(key)
return ResultRecord.deserialize(content)

def read(self, key: str) -> "ResultRecord":
"""
Expand Down Expand Up @@ -300,8 +370,6 @@ async def _write(
obj: The object to write to storage.
expiration: The expiration time for the result record.
"""
if self.result_storage is None:
self.result_storage = await get_default_result_storage()
key = key or self.storage_key_fn()

record = ResultRecord(
Expand Down Expand Up @@ -347,9 +415,24 @@ async def _persist_result_record(self, result_record: "ResultRecord"):
if self.result_storage is None:
self.result_storage = await get_default_result_storage()

await self.result_storage.write_path(
result_record.metadata.storage_key, content=result_record.serialize()
)
assert (
result_record.metadata.storage_key is not None
), "Storage key is required on result record"
# If metadata storage is configured, write result and metadata separately
if self.metadata_storage is not None:
await self.result_storage.write_path(
result_record.metadata.storage_key,
content=result_record.serialize_result(),
)
await self.metadata_storage.write_path(
result_record.metadata.storage_key,
content=result_record.serialize_metadata(),
)
# Otherwise, write the result metadata and result together
else:
await self.result_storage.write_path(
result_record.metadata.storage_key, content=result_record.serialize()
)

def persist_result_record(self, result_record: "ResultRecord"):
"""
Expand Down
63 changes: 63 additions & 0 deletions tests/results/test_result_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,3 +731,66 @@ def foo():
result_store = foo()

assert result_store.persist_result is persist_result


async def test_result_store_read_and_write_with_metadata_storage(tmp_path):
metadata_storage = LocalFileSystem(basepath=tmp_path / "metadata")
result_storage = LocalFileSystem(basepath=tmp_path / "results")
result_store = ResultStore(
metadata_storage=metadata_storage, result_storage=result_storage
)

key = "test"
value = "test"
await result_store.awrite(key=key, obj=value)
read_value = await result_store.aread(key=key)
assert read_value.result == value

# Check that the result is written to the result storage
assert (
result_store.serializer.loads((tmp_path / "results" / key).read_bytes())
== value
)

# Check that the metadata is written to the metadata storage
assert (
tmp_path / "metadata" / key
).read_text() == read_value.metadata.model_dump_json(serialize_as_any=True)


async def test_result_store_exists_with_metadata_storage(tmp_path):
metadata_storage = LocalFileSystem(basepath=tmp_path / "metadata")
result_storage = LocalFileSystem(basepath=tmp_path / "results")
result_store = ResultStore(
metadata_storage=metadata_storage, result_storage=result_storage
)

key = "test"
value = "test"
await result_store.awrite(key=key, obj=value)

assert await result_store.aexists(key=key) is True
assert await result_store.aexists(key="nonexistent") is False
assert result_store.exists(key=key) is True
assert result_store.exists(key="nonexistent") is False

# Remove the metadata file and check that the result is not found
(tmp_path / "metadata" / key).unlink()
assert await result_store.aexists(key=key) is False
assert result_store.exists(key=key) is False


async def test_result_store_exists_with_no_metadata_storage(tmp_path):
result_storage = LocalFileSystem(basepath=tmp_path / "results")
result_store = ResultStore(result_storage=result_storage)

key = "test"
value = "test"
await result_store.awrite(key=key, obj=value)
assert await result_store.aexists(key=key) is True
assert result_store.exists(key=key) is True

# Remove the result file and check that the result is not found
(tmp_path / "results" / key).unlink()
assert await result_store.aexists(key=key) is False
assert result_store.exists(key=key) is False

0 comments on commit 1f9bed1

Please sign in to comment.