From 2e3eb5844e31c4d656006c67c7d3a77dd6f9c9be Mon Sep 17 00:00:00 2001 From: zhen Date: Mon, 18 Mar 2024 17:48:17 +0800 Subject: [PATCH] [Cache] Fix the wrong cache when local file input in a pipeline (#34743) * fix cache bug * fix test case * fix code style * format * fix code style * test format * fix code format * Add debug log * Add debug log * fix code style * format code * fix code style * format code --- .../entities/_component/pipeline_component.py | 24 +++++++++++++++++++ .../unittests/test_pipeline_job_entity.py | 21 ++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py index a6db0a14043c..5e3f91894cfa 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py @@ -6,7 +6,9 @@ import json import logging +import os import re +import time import typing from collections import Counter from typing import Any, Dict, List, Optional, Tuple, Union @@ -16,6 +18,7 @@ from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties from azure.ai.ml._schema import PathAwareSchema from azure.ai.ml._schema.pipeline.pipeline_component import PipelineComponentSchema +from azure.ai.ml._utils._asset_utils import get_object_hash from azure.ai.ml._utils.utils import hash_dict, is_data_binding_expression from azure.ai.ml.constants._common import ARM_ID_PREFIX, ASSET_ARM_ID_REGEX_FORMAT, COMPONENT_TYPE from azure.ai.ml.constants._component import ComponentSource, NodeType @@ -334,6 +337,27 @@ def _get_anonymous_hash(self) -> str: # command component), so we just use rest object to generate hash for pipeline component, # which doesn't have reuse issue. component_interface_dict = self._to_rest_object().properties.component_spec + # Hash local inputs in pipeline component jobs + for job_name, job in self.jobs.items(): + if getattr(job, "inputs", None): + for input_name, input_value in job.inputs.items(): + try: + if ( + isinstance(input_value._data, Input) + and input_value.path + and os.path.exists(input_value.path) + ): + start_time = time.time() + component_interface_dict["jobs"][job_name]["inputs"][input_name][ + "content_hash" + ] = get_object_hash(input_value.path) + module_logger.debug( + "Takes %s seconds to calculate the content hash of local input %s", + time.time() - start_time, + input_value.path, + ) + except ValidationException: + pass hash_value: str = hash_dict( component_interface_dict, keys_to_omit=[ diff --git a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py index 9dcb73989d39..3e07891725b1 100644 --- a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py +++ b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_entity.py @@ -2194,3 +2194,24 @@ def test_pipeline_job_with_data_binding_expression_on_spark_resource(self, mock_ "instance_type": "${{parent.inputs.instance_type}}", "runtime_version": "3.2.0", } + + def test_local_input_in_pipeline_job(self, client: MLClient, tmp_path: Path): + file_path = tmp_path / "mock_input_file" + file_path.touch(exist_ok=True) + component_path = "./tests/test_configs/components/1in1out.yaml" + component_func = load_component(source=component_path) + + @pipeline() + def pipeline_with_local_input(): + input_folder = Input(type="uri_folder", path=tmp_path) + component_func(input1=input_folder) + + pipeline_obj = pipeline_with_local_input() + pipeline_obj.component.jobs["one_in_one_out"]._component = "mock_component_id" + pipeline_hash_id = pipeline_obj.component._get_anonymous_hash() + + with open(file_path, "w") as f: + f.write("mock_file") + + new_pipeline_hash_id = pipeline_obj.component._get_anonymous_hash() + assert new_pipeline_hash_id != pipeline_hash_id