-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support YAML input for CloudBuildCreateOperator #8808
Support YAML input for CloudBuildCreateOperator #8808
Conversation
removed spaces
:return: dict | ||
""" | ||
try: | ||
with open(self.body, 'r') as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we should use template_ext to load file. This will cause us to still have text instead of the dictionary, so we need to overwrite the prepare_template method to convert the string to a dictionary
def _load_body_to_dict(self): | ||
""" | ||
:param file: | ||
file path to YAML build config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add JSON file support? This is not common, but build can also be defined as JSON files.
You can write the build config file using the YAML or the JSON syntax.
https://cloud.google.com/cloud-build/docs/build-config#structure_of_a_build_config_file
airflow/providers/google/cloud/example_dags/example_cloud_build.py
Outdated
Show resolved
Hide resolved
airflow/providers/google/cloud/example_dags/example_cloud_build.yaml
Outdated
Show resolved
Hide resolved
@@ -99,6 +99,12 @@ | |||
task_id="create_build_from_repo_result", | |||
) | |||
|
|||
create_build_from_file = CloudBuildCreateOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this example to guide? https://airflow.readthedocs.io/en/latest/howto/operator/gcp/cloud_build.html
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
steps: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 11, in <module>
load_entry_point('apache-airflow', 'console_scripts', 'airflow')()
File "/opt/airflow/airflow/__main__.py", line 40, in main
args.func(args)
File "/opt/airflow/airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 84, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 341, in task_test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/opt/airflow/airflow/utils/session.py", line 61, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 1143, in run
session=session)
File "/opt/airflow/airflow/utils/session.py", line 57, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/sentry.py", line 140, in wrapper
return func(task_instance, *args, session=session, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 1024, in _run_raw_task
result = task_copy.execute(context=context)
File "/opt/airflow/airflow/providers/google/cloud/operators/cloud_build.py", line 217, in execute
body = BuildProcessor(body=self.body).process_body()
File "/opt/airflow/airflow/providers/google/cloud/operators/cloud_build.py", line 99, in process_body
self._verify_source()
File "/opt/airflow/airflow/providers/google/cloud/operators/cloud_build.py", line 52, in _verify_source
is_storage = "storageSource" in self.body["source"]
KeyError: 'source'
I'm afraid this file won't work. Can you make this optional key?
Can you make this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
providing a source is optional for a build config. Should we always check and fail if the body misses a source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should allow the build object to be passed without sources.
I tried to implement this feature based on template_ext. What do you think? 05:31 $ git diff --cached
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py b/airflow/providers/google/cloud/operators/cloud_build.py
index b86b45d12..739281c00 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Operators that integrat with Google Cloud Build service."""
+import json
import re
from copy import deepcopy
from typing import Any, Dict, Iterable, Optional, Union
@@ -85,17 +86,6 @@ class BuildProcessor:
self.body["source"]["storageSource"] = self._convert_storage_url_to_dict(source)
- def _load_body_to_dict(self):
- """
- :param file:
- file path to YAML build config
- :return: dict
- """
- try:
- with open(self.body, 'r') as f:
- self.body = yaml.safe_load(f)
- except yaml.YAMLError as e:
- raise AirflowException("Exception when loading resource definition: %s\n" % e)
def process_body(self):
"""
@@ -104,10 +94,6 @@ class BuildProcessor:
:return: the body.
:type: dict
"""
-
- if isinstance(self.body, str):
- self._load_body_to_dict()
- return self.body
self._verify_source()
self._reformat_source()
return self.body
@@ -193,6 +179,7 @@ class CloudBuildCreateOperator(BaseOperator):
"""
template_fields = ("body", "gcp_conn_id", "api_version") # type: Iterable[str]
+ template_ext = ['.yml', '.yaml', '.json']
@apply_defaults
def __init__(self,
@@ -203,11 +190,22 @@ class CloudBuildCreateOperator(BaseOperator):
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.body = body
+ # Not template fields to keep original value
+ self.body_raw = body
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self._validate_inputs()
+ def prepare_template(self) -> None:
+ # If no file is specified, skip
+ if not isinstance(self.body_raw, str):
+ return
+ if any(self.body_raw.endswith(ext) for ext in ['.yml', '.yaml']):
+ self.body = yaml.load(self.body)
+ if self.body_raw.endswith('.json'):
+ self.body = json.loads(self.body)
+
def _validate_inputs(self):
if not self.body:
raise AirflowException("The required parameter 'body' is missing") If you agree with this idea, could you add tests to this PR? |
@mik-laj that's a great implementation. I was not aware of prepare_template but is it suited for it. |
@joppevos Yes. Go ahead and use it. |
Added the feedback. The only thing I have to make now is the test. I assume the test would check if it is able to work correctly given a YAML or JSON build config. How would I approach this? Something like the code below? New to testing to be honest.
|
@joppevos It looks good for me, but before calling the execute method, you must call the dag.resolve_template_files() method. |
@joppevos Github action is sad. Can you fix it? |
Travis is sad. Do you need any help? |
@mik-laj You are helping me at every step ;) thanks. It seems that the |
@mik-laj The absolute path did indeed the trick. Build is passing now 👍 |
addresses the following issue.
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.