diff --git a/docs/source/design/clis.rst b/docs/source/design/clis.rst index 938ffa474a..79674258fa 100644 --- a/docs/source/design/clis.rst +++ b/docs/source/design/clis.rst @@ -49,6 +49,10 @@ Suppose you execute a script that defines 10 tasks and a workflow that calls onl It is considered fast registration because when a script is executed using ``pyflyte run``, the script is bundled up and uploaded to FlyteAdmin. When the task is executed in the backend, this zipped file is extracted and used. +.. note :: + + If `pigz `_ is installed, it will be leveraged by ``pyflyte`` to accelerate the compression of the code tarball. + .. _pyflyte-register: What is ``pyflyte register``? diff --git a/flytekit/tools/fast_registration.py b/flytekit/tools/fast_registration.py index a65d24a740..dc3f25bf28 100644 --- a/flytekit/tools/fast_registration.py +++ b/flytekit/tools/fast_registration.py @@ -5,10 +5,12 @@ import os import pathlib import posixpath +import shutil import subprocess import sys import tarfile import tempfile +import time import typing from dataclasses import dataclass from enum import Enum @@ -75,6 +77,26 @@ def print_ls_tree(source: os.PathLike, ls: typing.List[str]): rich_print(tree_root) +def compress_tarball(source: os.PathLike, output: os.PathLike) -> None: + """Compress code tarball using pigz if available, otherwise gzip""" + if pigz := shutil.which("pigz"): + with open(output, "wb") as gzipped: + subprocess.run([pigz, "-c", source], stdout=gzipped, check=True) + else: + start_time = time.time() + with gzip.GzipFile(filename=output, mode="wb", mtime=0) as gzipped: + with open(source, "rb") as source_file: + gzipped.write(source_file.read()) + + end_time = time.time() + warning_time = 10 + if end_time - start_time > warning_time: + click.secho( + f"Code tarball compression took {end_time - start_time:.0f} seconds. Consider installing `pigz` for faster compression.", + fg="yellow", + ) + + def fast_package( source: os.PathLike, output_dir: os.PathLike, @@ -139,9 +161,7 @@ def fast_package( filter=lambda x: tar_strip_file_attributes(x), ) - with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: - with open(tar_path, "rb") as tar_file: - gzipped.write(tar_file.read()) + compress_tarball(tar_path, archive_fname) # Original tar command - This condition to be removed in the future. else: @@ -164,9 +184,7 @@ def fast_package( ) # tar.list(verbose=True) - with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped: - with open(tar_path, "rb") as tar_file: - gzipped.write(tar_file.read()) + compress_tarball(tar_path, archive_fname) return archive_fname