From f98f88da82773e12bcc915226f29e0315fea5048 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Mon, 29 Jul 2024 13:05:50 -0700 Subject: [PATCH] exclude fast register tar.gz files --- .../flytekit-ray/flytekitplugins/ray/task.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index e32d2428942..38d28cfb6ff 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -71,15 +71,18 @@ def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: ctx = FlyteContextManager.current_context() if not ctx.execution_state.is_local_execution(): working_dir = os.getcwd() - init_params["runtime_env"] = {"working_dir": working_dir} + init_params["runtime_env"] = { + "working_dir": working_dir, + "excludes": ["script_mode.tar.gz", "fast*.tar.gz"], + } cfg = self._task_config if cfg.excludes_working_dir: - init_params["runtime_env"]["excludes"] = cfg.excludes_working_dir + init_params["runtime_env"]["excludes"].append(cfg.excludes_working_dir) # fast register data with timestamp mtime=0 will be zipped and uploaded to ray gcs # zip does not support timestamps before 1980 -> hacky workaround of touching all the files - os.system(f"touch `find {working_dir} -type f`") + # os.system(f"touch `find {working_dir} -type f`") ray.init(**init_params) return user_params @@ -88,14 +91,20 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] cfg = self._task_config # Deprecated: runtime_env is removed KubeRay >= 1.1.0. It is replaced by runtime_env_yaml - runtime_env = base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() if cfg.runtime_env else None + runtime_env = ( + base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() + if cfg.runtime_env + else None + ) runtime_env_yaml = yaml.dump(cfg.runtime_env) if cfg.runtime_env else None ray_job = RayJob( ray_cluster=RayCluster( head_group_spec=( - HeadGroupSpec(cfg.head_node_config.ray_start_params) if cfg.head_node_config else None + HeadGroupSpec(cfg.head_node_config.ray_start_params) + if cfg.head_node_config + else None ), worker_group_spec=[ WorkerGroupSpec( @@ -107,7 +116,9 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] ) for c in cfg.worker_node_config ], - enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False), + enable_autoscaling=( + cfg.enable_autoscaling if cfg.enable_autoscaling else False + ), ), runtime_env=runtime_env, runtime_env_yaml=runtime_env_yaml,