Skip to content

Commit

Permalink
exclude fast register tar.gz files
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Fiedler <jan@union.ai>
  • Loading branch information
fiedlerNr9 committed Jul 29, 2024
1 parent 357b3c6 commit 061698f
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions plugins/flytekit-ray/flytekitplugins/ray/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters:
ctx = FlyteContextManager.current_context()
if not ctx.execution_state.is_local_execution():
working_dir = os.getcwd()
init_params["runtime_env"] = {"working_dir": working_dir}
init_params["runtime_env"] = {
"working_dir": working_dir,
"excludes": ["script_mode.tar.gz", "fast*.tar.gz"],
}

cfg = self._task_config
if cfg.excludes_working_dir:
init_params["runtime_env"]["excludes"] = cfg.excludes_working_dir
init_params["runtime_env"]["excludes"].append(cfg.excludes_working_dir)

# fast register data with timestamp mtime=0 will be zipped and uploaded to ray gcs
# zip does not support timestamps before 1980 -> hacky workaround of touching all the files
os.system(f"touch `find {working_dir} -type f`")
# os.system(f"touch `find {working_dir} -type f`")

ray.init(**init_params)
return user_params
Expand All @@ -88,14 +91,20 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]
cfg = self._task_config

# Deprecated: runtime_env is removed KubeRay >= 1.1.0. It is replaced by runtime_env_yaml
runtime_env = base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() if cfg.runtime_env else None
runtime_env = (
base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode()
if cfg.runtime_env
else None
)

runtime_env_yaml = yaml.dump(cfg.runtime_env) if cfg.runtime_env else None

ray_job = RayJob(
ray_cluster=RayCluster(
head_group_spec=(
HeadGroupSpec(cfg.head_node_config.ray_start_params) if cfg.head_node_config else None
HeadGroupSpec(cfg.head_node_config.ray_start_params)
if cfg.head_node_config
else None
),
worker_group_spec=[
WorkerGroupSpec(
Expand All @@ -107,7 +116,9 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]
)
for c in cfg.worker_node_config
],
enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False),
enable_autoscaling=(
cfg.enable_autoscaling if cfg.enable_autoscaling else False
),
),
runtime_env=runtime_env,
runtime_env_yaml=runtime_env_yaml,
Expand Down

0 comments on commit 061698f

Please sign in to comment.