Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mao3267/flytekit into inp…
Browse files Browse the repository at this point in the history
…ut-through-file-and-pipe
  • Loading branch information
mao3267 committed Aug 9, 2024
2 parents b2cb7b1 + 9666f15 commit e63194b
Show file tree
Hide file tree
Showing 32 changed files with 546 additions and 97 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/monodocs_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ jobs:
steps:
- name: Fetch flytekit code
uses: actions/checkout@v4
with:
path: "${{ github.workspace }}/flytekit"
- name: 'Clear action cache'
uses: ./.github/actions/clear-action-cache
- name: Fetch flyte code
uses: actions/checkout@v4
with:
Expand All @@ -41,7 +41,6 @@ jobs:
export SETUPTOOLS_SCM_PRETEND_VERSION="2.0.0"
pip install -e ./flyteidl
- shell: bash -el {0}
working-directory: ${{ github.workspace }}/flytekit
run: |
conda activate monodocs-env
pip install -e .
Expand All @@ -54,7 +53,7 @@ jobs:
working-directory: ${{ github.workspace }}/flyte
shell: bash -el {0}
env:
FLYTEKIT_LOCAL_PATH: ${{ github.workspace }}/flytekit
FLYTEKIT_LOCAL_PATH: ${{ github.workspace }}
run: |
conda activate monodocs-env
make -C docs clean html SPHINXOPTS="-W -vvv"
3 changes: 2 additions & 1 deletion .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version: ${{fromJson(needs.detect-python-versions.outputs.python-versions)}}
makefile-cmd: [integration_test_codecov, integration_test_lftransfers_codecov]
steps:
# As described in https://github.com/pypa/setuptools_scm/issues/414, SCM needs git history
# and tags to work.
Expand Down Expand Up @@ -297,7 +298,7 @@ jobs:
FLYTEKIT_CI: 1
PYTEST_OPTS: -n2
run: |
make integration_test_codecov
make ${{ matrix.makefile-cmd }}
- name: Codecov
uses: codecov/codecov-action@v3.1.0
with:
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ RUN apt-get update && apt-get install build-essential -y \
&& apt-get clean autoclean \
&& apt-get autoremove --yes \
&& rm -rf /var/lib/{apt,dpkg,cache,log}/ \
&& rm -rf /root/.cache/pip \
&& useradd -u 1000 flytekit \
&& chown flytekit: /root \
&& chown flytekit: /home \
Expand Down
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ integration_test_codecov:

.PHONY: integration_test
integration_test:
$(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS}
$(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS} -m "not lftransfers"

.PHONY: integration_test_lftransfers_codecov
integration_test_lftransfers_codecov:
$(MAKE) CODECOV_OPTS="--cov=./ --cov-report=xml --cov-append" integration_test_lftransfers

.PHONY: integration_test_lftransfers
integration_test_lftransfers:
$(PYTEST) tests/flytekit/integration ${CODECOV_OPTS} -m "lftransfers"

doc-requirements.txt: export CUSTOM_COMPILE_COMMAND := make doc-requirements.txt
doc-requirements.txt: doc-requirements.in install-piptools
Expand Down
3 changes: 2 additions & 1 deletion flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def __init__(self, cfg: PlatformConfig, **kwargs):
# Set the value here to match the limit in Admin, otherwise the client will cut off and the user gets a
# StreamRemoved exception.
# https://github.com/flyteorg/flyte/blob/e8588f3a04995a420559327e78c3f95fbf64dc01/flyteadmin/pkg/common/constants.go#L14
options = (("grpc.max_metadata_size", 32000),)
# 32KB for error messages, 20MB for actual messages.
options = (("grpc.max_metadata_size", 32 * 1024), ("grpc.max_receive_message_length", 20 * 1024 * 1024))
self._cfg = cfg
self._channel = wrap_exceptions_channel(
cfg,
Expand Down
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,8 @@ def _get_entities(self, r: FlyteRemote, project: str, domain: str, limit: int) -
return []

def list_commands(self, ctx):
if "--help" in sys.argv:
return []
if self._entities or ctx.obj is None:
return self._entities

Expand Down
10 changes: 7 additions & 3 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def local_execute(
except TypeTransformerFailedError as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise TypeError(msg) from exc
raise TypeError(msg) from None
input_literal_map = _literal_models.LiteralMap(literals=literals)

# if metadata.cache is set, check memoized version
Expand Down Expand Up @@ -636,7 +636,11 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte
except Exception as e:
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != f"o{i}" else i
msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}"
msg = (
f"Failed to convert outputs of task '{self.name}' at position {key}.\n"
f"Failed to convert type {type(native_outputs_as_map[expected_output_names[i]])} to type {py_type}.\n"
f"Error Message: {e}."
)
logger.error(msg)
raise TypeError(msg) from e
# Now check if there is any output metadata associated with this output variable and attach it to the
Expand Down Expand Up @@ -724,7 +728,7 @@ def dispatch_execute(
except Exception as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise type(exc)(msg) from exc
raise type(exc)(msg) from None

# TODO: Logger should auto inject the current context information to indicate if the task is running within
# a workflow or a subworkflow etc
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def reference_launch_plan(
"""

def wrapper(fn) -> ReferenceLaunchPlan:
interface = transform_function_to_interface(fn)
interface = transform_function_to_interface(fn, is_reference_entity=True)
return ReferenceLaunchPlan(project, domain, name, version, interface.inputs, interface.outputs)

return wrapper
13 changes: 9 additions & 4 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import typing
from copy import deepcopy
from enum import Enum
from typing import Any, Coroutine, Dict, Hashable, List, Optional, Set, Tuple, Union, cast, get_args
from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple, Union, cast, get_args

from google.protobuf import struct_pb2 as _struct
from typing_extensions import Protocol
Expand Down Expand Up @@ -94,7 +94,7 @@ def my_wf(in1: int, in2: int) -> int:
v = resolve_attr_path_in_promise(v)
result[k] = TypeEngine.to_literal(ctx, v, t, var.type)
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from exc
raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from None

return result

Expand Down Expand Up @@ -1116,8 +1116,13 @@ def create_and_link_node(
or UnionTransformer.is_optional_type(interface.inputs_with_defaults[k][0])
):
default_val = interface.inputs_with_defaults[k][1]
if not isinstance(default_val, Hashable):
raise _user_exceptions.FlyteAssertion("Cannot use non-hashable object as default argument")
# Common cases of mutable default arguments, as described in https://www.pullrequest.com/blog/python-pitfalls-the-perils-of-using-lists-and-dicts-as-default-arguments/
# or https://florimond.dev/en/posts/2018/08/python-mutable-defaults-are-the-source-of-all-evil, are not supported.
# As of 2024-08-05, Python native sets are not supported in Flytekit. However, they are included here for completeness.
if isinstance(default_val, list) or isinstance(default_val, dict) or isinstance(default_val, set):
raise _user_exceptions.FlyteAssertion(
f"Argument {k} for function {entity.name} is a mutable default argument, which is a python anti-pattern and not supported in flytekit tasks"
)
kwargs[k] = default_val
else:
error_msg = f"Input {k} of type {interface.inputs[k]} was not specified for function {entity.name}"
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ def literal_map_to_kwargs(
try:
kwargs[k] = TypeEngine.to_python_value(ctx, lm.literals[k], python_interface_inputs[k])
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from exc
raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from None
return kwargs

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion flytekit/deck/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def to_html(self) -> str:
.replace("\\n", "\n")
.rstrip()
)
except subprocess.CalledProcessError as e:
except Exception as e:
logger.error(f"Error occurred while fetching installed packages: {e}")
return "Error occurred while fetching installed packages."

Expand Down
21 changes: 15 additions & 6 deletions flytekit/extras/tasks/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,26 @@ def subproc_execute(command: typing.Union[List[str], str], **kwargs) -> ProcessR

kwargs = {**defaults, **kwargs}

try:
# Execute the command and capture stdout and stderr
result = subprocess.run(command, **kwargs)
print(result.check_returncode())

if "|" in command and kwargs.get("shell"):
if kwargs.get("shell"):
if "|" in command:
logger.warning(
"""Found a pipe in the command and shell=True.
This can lead to silent failures if subsequent commands
succeed despite previous failures."""
)
if type(command) == list:
logger.warning(
"""Found `command` formatted as a list instead of a string with shell=True.
With this configuration, the first member of the list will be
executed and the remaining arguments will be passed as arguments
to the shell instead of to the binary being called. This may not
be intended behavior and may lead to confusing failures."""
)

try:
# Execute the command and capture stdout and stderr
result = subprocess.run(command, **kwargs)
result.check_returncode()

# Access the stdout and stderr output
return ProcessResult(result.returncode, result.stdout, result.stderr)
Expand Down
17 changes: 17 additions & 0 deletions flytekit/image_spec/image_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ def __post_init__(self):
if self.registry:
self.registry = self.registry.lower()

parameters_str_list = [
"packages",
"conda_channels",
"conda_packages",
"apt_packages",
"pip_extra_index_url",
"entrypoint",
"commands",
]
for parameter in parameters_str_list:
attr = getattr(self, parameter)
parameter_is_None = attr is None
parameter_is_list_string = isinstance(attr, list) and all(isinstance(v, str) for v in attr)
if not (parameter_is_None or parameter_is_list_string):
error_msg = f"{parameter} must be a list of strings or None"
raise ValueError(error_msg)

def image_name(self) -> str:
"""Full image name with tag."""
image_name = self._image_name()
Expand Down
2 changes: 2 additions & 0 deletions flytekit/models/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def __init__(self, key, values):
:param Text key: The name of the field to compare against
:param list[Text] values: A list of textual values to compare.
"""
if not isinstance(values, list):
raise TypeError(f"values must be a list. but got {type(values)}")
super(SetFilter, self).__init__(key, ";".join(values))

@classmethod
Expand Down
28 changes: 19 additions & 9 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def raw_register(
raise RegistrationSkipped(f"Remote task/Workflow {cp_entity.name} is not registrable.")
else:
logger.debug(f"Skipping registration of remote entity: {cp_entity.name}")
raise RegistrationSkipped(f"Remote task/Workflow {cp_entity.name} is not registrable.")
raise RegistrationSkipped(f"Remote entity {cp_entity.name} is not registrable.")

if isinstance(
cp_entity,
Expand Down Expand Up @@ -768,13 +768,23 @@ async def _serialize_and_register(
functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
)
)
ident = []
ident.extend(await asyncio.gather(*tasks))

identifiers_or_exceptions = []
identifiers_or_exceptions.extend(await asyncio.gather(*tasks, return_exceptions=True))
# Check to make sure any exceptions are just registration skipped exceptions
for ie in identifiers_or_exceptions:
if isinstance(ie, RegistrationSkipped):
logger.info(f"Skipping registration... {ie}")
continue
if isinstance(ie, Exception):
raise ie
# serial register
cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
for entity, cp_entity in cp_other_entities.items():
ident.append(self.raw_register(cp_entity, serialization_settings, version, og_entity=entity))
return ident[-1]
identifiers_or_exceptions.append(
self.raw_register(cp_entity, serialization_settings, version, og_entity=entity)
)
return identifiers_or_exceptions[-1]

def register_task(
self,
Expand Down Expand Up @@ -901,14 +911,14 @@ def upload_file(
extra_headers = self.get_extra_headers_for_protocol(upload_location.native_url)
extra_headers.update(upload_location.headers)
encoded_md5 = b64encode(md5_bytes)
with open(str(to_upload), "+rb") as local_file:
content = local_file.read()
content_length = len(content)
local_file_path = str(to_upload)
content_length = os.stat(local_file_path).st_size
with open(local_file_path, "+rb") as local_file:
headers = {"Content-Length": str(content_length), "Content-MD5": encoded_md5}
headers.update(extra_headers)
rsp = requests.put(
upload_location.signed_url,
data=content,
data=local_file, # NOTE: We pass the file object directly to stream our upload.
headers=headers,
verify=False
if self._config.platform.insecure_skip_verify is True
Expand Down
34 changes: 11 additions & 23 deletions flytekit/types/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,38 +309,26 @@ def open(
cache_type: typing.Optional[str] = None,
cache_options: typing.Optional[typing.Dict[str, typing.Any]] = None,
):
"""
Returns a streaming File handle
"""Returns a streaming File handle
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with ff.open("rb", cache_type="readahead", cache={}) as r:
new_file = FlyteFile.new_remote_file()
with ff.open("rb", cache_type="readahead") as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
Alternatively,
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with fsspec.open(f"readahead::{ff.remote_path}", "rb", readahead={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
:param mode: str Open mode like 'rb', 'rt', 'wb', ...
:param cache_type: optional str Specify if caching is to be used. Cache protocol can be ones supported by
fsspec https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering,
especially useful for large file reads
:param cache_options: optional Dict[str, Any] Refer to fsspec caching options. This is strongly coupled to the
cache_protocol
:param mode: Open mode. For example: 'r', 'w', 'rb', 'rt', 'wb', etc.
:type mode: str
:param cache_type: Specifies the cache type. Possible values are "blockcache", "bytes", "mmap", "readahead", "first", or "background".
This is especially useful for large file reads. See https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering.
:type cache_type: str, optional
:param cache_options: A Dict corresponding to the parameters for the chosen cache_type.
Refer to fsspec caching options above.
:type cache_options: Dict[str, Any], optional
"""
ctx = FlyteContextManager.current_context()
final_path = self.path
Expand Down
Loading

0 comments on commit e63194b

Please sign in to comment.