Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Spark Fast Register #2765

Merged
merged 36 commits into from
Sep 25, 2024
Merged

Enable Spark Fast Register #2765

merged 36 commits into from
Sep 25, 2024

Conversation

pingsutw
Copy link
Member

Tracking issue

NA

Why are the changes needed?

When we fast-register a Spark task, we will always get a module not found error in the Spark executors.

What changes were proposed in this pull request?

Archive the current directory into a zip file and send it to the spark executor. Exectors will load the module from that zip file.

How was this patch tested?

pyflyte run --remote fast_spark.py my_spark

Setup process

import datetime
import random
from operator import add

import flytekit
from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.spark import Spark
from long_description import hello  # local module 
from misc.ignore_pattens import hello123  # local module

flytekit_hash = "be5d8eac263fc8e5d57de110f9648b696d48f20d"
new_flytekit = f"git+https://github.com/flyteorg/flytekit@{flytekit_hash}"
spark_plugins = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}#subdirectory=plugins/flytekit-spark"
custom_image = ImageSpec(
    # python_version="3.9",
    # base_image="pingsutw/spark:v1",
    registry="ghcr.io/flyteorg",
    # packages=[spark_plugins],
    packages=[spark_plugins, new_flytekit],
    apt_packages=["git"]
)


@task(
    task_config=Spark(
        # This configuration is applied to the Spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
            "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
        },
        executor_path="/usr/bin/python3",
        applications_path="local:///usr/local/bin/entrypoint.py",
    ),
    limits=Resources(mem="2000M"),
    container_image=custom_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitionsssss: {}".format(partitions))
    hello()
    hello123()

    n = 1 * partitions
    sess = flytekit.current_context().spark_session
    count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)

    pi_val = 4.0 * count / n
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(
    container_image=custom_image,
)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    hello()
    hello123()

    return 1


@workflow
def my_spark(triggered_date: datetime.datetime = datetime.datetime(2020, 9, 11)) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=1)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi

Screenshots

Screenshot 2024-09-23 at 2 03 50 PM

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

NA

Docs link

NA

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@@ -270,6 +270,9 @@ def _resolve_abs_module_name(self, path: str, package_root: typing.Optional[str]
# Let us remove any extensions like .py
basename = os.path.splitext(basename)[0]

if not Path(dirname).is_dir():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the executor will load the workflow from the zip file.
image

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw
Copy link
Member Author

cc @fiedlerNr9

current_time = time.time()
current_dir = os.getcwd()
# Set the modification time of all files in the current directory to the current time
# since fast register doesn't preserve the modification time of the files and make_archive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we arbitrarily set the time in fast register to 1970 just cuz. it can be any constant. if we make it 1980, would that make this cleaner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set it to 1980

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Copy link

codecov bot commented Sep 25, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 93.34%. Comparing base (7f54171) to head (5e4076d).
Report is 17 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##            master    #2765      +/-   ##
===========================================
- Coverage   100.00%   93.34%   -6.66%     
===========================================
  Files            5       39      +34     
  Lines          122     1712    +1590     
===========================================
+ Hits           122     1598    +1476     
- Misses           0      114     +114     
Flag Coverage Δ
93.34% <ø> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

wild-endeavor
wild-endeavor previously approved these changes Sep 25, 2024
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw merged commit f394bc9 into master Sep 25, 2024
29 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants