Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core/enable deck #2314

Merged
merged 16 commits into from
Jul 2, 2024
2 changes: 1 addition & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _dispatch_execute(
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")

if not getattr(task_def, "disable_deck", True):
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params)
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params.with_rendered_decks(task_def.decks).build())

logger.debug("Finished _dispatch_execute")

Expand Down
58 changes: 43 additions & 15 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck.deck import DeckFields
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -462,6 +463,7 @@
environment: Optional[Dict[str, str]] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
decks: Optional[Tuple[str, ...]] = None,
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand All @@ -477,6 +479,8 @@
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): (deprecated) If true, this task will not output deck html file
enable_deck (bool): If true, this task will output deck html file
decks (Tuple[str]): Tuple of decks to be
generated for this task. Valid values can be selected from fields of ``flytekit.deck.deck.DeckFields`` enum
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
super().__init__(
task_type=task_type,
Expand All @@ -487,23 +491,35 @@
self._python_interface = interface if interface else Interface()
self._environment = environment if environment else {}
self._task_config = task_config
self._decks = list(decks) if (decks is not None and (enable_deck is True or disable_deck is False)) else []

deck_members = set([_field.value for _field in DeckFields])
# enumerate additional decks, check if any of them are invalid
for deck in self._decks:
if deck not in deck_members:
raise ValueError(
f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}"
)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

# first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck]
# are set, we raise an error
configured_deck_params = [disable_deck is not None, enable_deck is not None]
if sum(configured_deck_params) > 1:
raise ValueError("only one of [disable_deck, enable_deck] can be set")

if disable_deck is not None:
warnings.warn(
"disable_deck was deprecated in 1.10.0, please use enable_deck instead",
"disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead",
FutureWarning,
)

# Confirm that disable_deck and enable_deck do not contradict each other
if disable_deck is not None and enable_deck is not None:
raise ValueError("disable_deck and enable_deck cannot both be set at the same time")

if enable_deck is not None:
self._disable_deck = not enable_deck
elif disable_deck is not None:
self._disable_deck = disable_deck
else:
self._disable_deck = True

if self._python_interface.docstring:
if self.docs is None:
self._docs = Documentation(
Expand Down Expand Up @@ -643,18 +659,20 @@

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck.deck import Deck, _output_deck
from flytekit.deck.deck import Deck, DeckFields, _output_deck

INPUT = "Inputs"
OUTPUT = "Outputs"
INPUT = DeckFields.INPUT
OUTPUT = DeckFields.OUTPUT

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
if DeckFields.INPUT in self.decks:
input_deck = Deck(INPUT.value)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
if DeckFields.OUTPUT in self.decks:
output_deck = Deck(OUTPUT.value)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

if ctx.execution_state and ctx.execution_state.is_local_execution():
# When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute
Expand Down Expand Up @@ -755,7 +773,10 @@

This should return either the same context of the mutated context
"""
return user_params
if user_params is None:
return user_params

Check warning on line 777 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L777

Added line #L777 was not covered by tests
new_param = user_params.with_rendered_decks(self.decks).build()
return new_param

@abstractmethod
def execute(self, **kwargs) -> Any:
Expand Down Expand Up @@ -789,6 +810,13 @@
"""
return self._disable_deck

@property
def decks(self) -> List[str]:
"""
If not empty, this task will output deck html file for the specified decks
"""
return self._decks


class TaskResolverMixin(object):
"""
Expand Down
21 changes: 19 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Builder(object):
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
rendered_decks: Optional[List[str]] = None
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
output_metadata_prefix: Optional[str] = None
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -101,6 +103,8 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.rendered_decks = current.rendered_decks if current else []
self.output_metadata_prefix = current.output_metadata_prefix if current else None

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -119,6 +123,8 @@ def build(self) -> ExecutionParameters:
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
rendered_decks=self.rendered_decks,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)

Expand All @@ -140,6 +146,11 @@ def with_task_sandbox(self) -> Builder:
b.working_dir = task_sandbox_dir
return b

def with_rendered_decks(self, rendered_decks: List[str]) -> Builder:
b = self.new_builder(self)
b.rendered_decks = rendered_decks
return b

def builder(self) -> Builder:
return ExecutionParameters.Builder(current=self)

Expand All @@ -155,6 +166,7 @@ def __init__(
checkpoint=None,
decks=None,
task_id: typing.Optional[_identifier.Identifier] = None,
rendered_decks=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -182,6 +194,7 @@ def __init__(
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._rendered_decks = [] if rendered_decks is None else rendered_decks

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -274,18 +287,22 @@ def default_deck(self) -> Deck:

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck
from flytekit.deck.deck import DeckFields, TimeLineDeck

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")
time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value)

return time_line_deck

@property
def rendered_decks(self) -> List[str]:
return self._rendered_decks

def __getattr__(self, attr_name: str) -> typing.Any:
"""
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc
Expand Down
5 changes: 3 additions & 2 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,19 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any:
def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck import Deck
from flytekit.deck.deck import DeckFields
from flytekit.deck.renderer import PythonDependencyRenderer

# These errors are raised if the source code can not be retrieved
with suppress(OSError, TypeError):
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck("Source Code")
source_code_deck = Deck(DeckFields.SOURCE_CODE.value)
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

python_dependencies_deck = Deck("Dependencies")
python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

Expand Down
5 changes: 5 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
decks: Optional[Tuple[str, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -152,6 +153,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
decks: Optional[Tuple[str, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -189,6 +191,7 @@ def task(
docs: Optional[Documentation] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
decks: Optional[Tuple[str, ...]] = ("source_code", "dependencies"),
pod_template: Optional["PodTemplate"] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
Expand Down Expand Up @@ -309,6 +312,7 @@ def launch_dynamically():
:param task_resolver: Provide a custom task resolver.
:param disable_deck: (deprecated) If true, this task will not output deck html file
:param enable_deck: If true, this task will output deck html file
:param decks: If specified and enble_deck is True, this task will output deck html file with the fields specified in the list
:param docs: Documentation about this task
:param pod_template: Custom PodTemplate for this task.
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
Expand Down Expand Up @@ -341,6 +345,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]:
task_resolver=task_resolver,
disable_deck=disable_deck,
enable_deck=enable_deck,
decks=decks,
docs=docs,
pod_template=pod_template,
pod_template_name=pod_template_name,
Expand Down
22 changes: 21 additions & 1 deletion flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import os
import typing
from typing import Optional
Expand All @@ -10,6 +11,18 @@
DECK_FILE_NAME = "deck.html"


class DeckFields(str, enum.Enum):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
DeckFields is used to specify the fields that will be rendered in the deck.
"""

INPUT = "input"
OUTPUT = "output"
SOURCE_CODE = "source_code"
TIMELINE = "Timeline"
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
DEPENDENCIES = "dependencies"


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Expand Down Expand Up @@ -128,7 +141,14 @@ def _get_deck(
Get flyte deck html string
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}
deck_members = set([_field.value for _field in DeckFields])
rendered_decks = new_user_params.rendered_decks
deck_map = {
deck.name: deck.html
for deck in new_user_params.decks
if deck.name in rendered_decks or deck.name not in deck_members
}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
try:
Expand Down
10 changes: 5 additions & 5 deletions tests/flytekit/unit/core/test_flyte_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def my_wf() -> FlyteFile:
def test_file_handling_remote_file_handling():
SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"

@task
@task(enable_deck=False)
def t1() -> FlyteFile:
return SAMPLE_DATA

Expand Down Expand Up @@ -312,7 +312,7 @@ def my_wf() -> FlyteFile:
def test_file_handling_remote_file_handling_flyte_file():
SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"

@task
@task(enable_deck=False)
def t1() -> FlyteFile:
# Unlike the test above, this returns the remote path wrapped in a FlyteFile object
return FlyteFile(SAMPLE_DATA)
Expand Down Expand Up @@ -606,23 +606,23 @@ def test_for_downloading():

@pytest.mark.sandbox_test
def test_file_open_things():
@task
@task(enable_deck=False)
def write_this_file_to_s3() -> FlyteFile:
ctx = FlyteContextManager.current_context()
r = ctx.file_access.get_random_string()
dest = ctx.file_access.join(ctx.file_access.raw_output_prefix, r)
ctx.file_access.put(__file__, dest)
return FlyteFile(path=dest)

@task
@task(enable_deck=False)
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.remote_path)
with ff.open("r") as r:
with new_file.open("w") as w:
w.write(r.read())
return new_file

@task
@task(enable_deck=False)
def print_file(ff: FlyteFile):
with open(ff, "r") as fh:
print(len(fh.readlines()))
Expand Down
Loading
Loading