Skip to content

Commit

Permalink
Feat: Optionally use pigz to speed up tarball compression (#2729)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Graetz <fabiograetz@googlemail.com>
  • Loading branch information
fg91 authored Sep 10, 2024
1 parent a366653 commit 7290c2a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
4 changes: 4 additions & 0 deletions docs/source/design/clis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Suppose you execute a script that defines 10 tasks and a workflow that calls onl

It is considered fast registration because when a script is executed using ``pyflyte run``, the script is bundled up and uploaded to FlyteAdmin. When the task is executed in the backend, this zipped file is extracted and used.

.. note ::
If `pigz <https://zlib.net/pigz/>`_ is installed, it will be leveraged by ``pyflyte`` to accelerate the compression of the code tarball.
.. _pyflyte-register:

What is ``pyflyte register``?
Expand Down
30 changes: 24 additions & 6 deletions flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import os
import pathlib
import posixpath
import shutil
import subprocess
import sys
import tarfile
import tempfile
import time
import typing
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -75,6 +77,26 @@ def print_ls_tree(source: os.PathLike, ls: typing.List[str]):
rich_print(tree_root)


def compress_tarball(source: os.PathLike, output: os.PathLike) -> None:
"""Compress code tarball using pigz if available, otherwise gzip"""
if pigz := shutil.which("pigz"):
with open(output, "wb") as gzipped:
subprocess.run([pigz, "-c", source], stdout=gzipped, check=True)
else:
start_time = time.time()
with gzip.GzipFile(filename=output, mode="wb", mtime=0) as gzipped:
with open(source, "rb") as source_file:
gzipped.write(source_file.read())

end_time = time.time()
warning_time = 10
if end_time - start_time > warning_time:
click.secho(
f"Code tarball compression took {end_time - start_time:.0f} seconds. Consider installing `pigz` for faster compression.",
fg="yellow",
)


def fast_package(
source: os.PathLike,
output_dir: os.PathLike,
Expand Down Expand Up @@ -139,9 +161,7 @@ def fast_package(
filter=lambda x: tar_strip_file_attributes(x),
)

with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped:
with open(tar_path, "rb") as tar_file:
gzipped.write(tar_file.read())
compress_tarball(tar_path, archive_fname)

# Original tar command - This condition to be removed in the future.
else:
Expand All @@ -164,9 +184,7 @@ def fast_package(
)
# tar.list(verbose=True)

with gzip.GzipFile(filename=archive_fname, mode="wb", mtime=0) as gzipped:
with open(tar_path, "rb") as tar_file:
gzipped.write(tar_file.read())
compress_tarball(tar_path, archive_fname)

return archive_fname

Expand Down

0 comments on commit 7290c2a

Please sign in to comment.