-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add DatabricksJobsCreateOperator
#32221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
DatabricksJobsCreateOperator
|
cc: @alexott and team ? |
|
I think i figured it out |
let's see. approved the run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice the type is added at the end of the docstring. I don't see this pattern in other databricks operators. Do we need to keep it as we have already annotated it in the __init__ method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other operators don't use data classes, that's why there are no types there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, but I thought type annotation was enough? not a blocker, though
alexott
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let rename the functions, and check regarding access_control_list - I already asked in the engineering channel
docs/apache-airflow-providers-databricks/operators/jobs_create.rst
Outdated
Show resolved
Hide resolved
merge from upstream
alexott
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Needs tests fixing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Should we change the type to dict instead?
|
hey @potiuk is anything holding this up from being merged? Is it just the type hint comment? |
Mostly just lack of ping after you fixed the tests to run the workflows I think. Generally if you see that you PR needs attention, just comment on it. Otherwise it might be very easily missed that you fixed tests. It's up to the author to bring attention of those who might review it - when you have 1 PR to care about - some reviewers have likely 40-50 to look at. |
|
@potiuk i updated branch a little while please let me know if this works. |
Some other PR got merged in the meantime - you need to rebase and resolve conflict. Can you also respond please to all the comments in this PR and confirm that they've been addressed? There were lots of comments from @alexott and I am not sure if all have been addressed. |
alexott
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other operators don't use data classes, that's why there are no types there.
Let's see If our CI agrees to merge it. I just approve it for runing - let me know if/when it succeds (or fails) @stikkireddy |
|
@potiuk seems like it failed due to static checks, i will take a look at the type hints. and ping you again once they are resolved. |
| if name is not None: | ||
| self.json["name"] = name | ||
| if tags is not None: | ||
| self.json["tags"] = tags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since json is a templated field, attempting to modify it in this way will fail/not work as expected if the input arg is a string (e.g. "{{ var.json....}}" or an XComArg (meaning it's an output of a previous task). Template fields are not rendered until just before the execute method is called.
It would be best to move modifying json (and generally any template field) to the execute method instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's behaviour isn't different from other databricks operators: https://github.com/apache/airflow/blob/main/airflow/providers/databricks/operators/databricks.py#L334
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To illustrate the point, let's use this example DAG where we define the json arg in a previous task and use its output:
from __future__ import annotations
from pendulum import datetime
from typing import TYPE_CHECKING, Sequence
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
class DatabricksCreateJobsOperator(BaseOperator):
template_fields: Sequence[str] = ("json", "databricks_conn_id")
def __init__(
self,
*,
json: dict | None = None,
name: str | None = None,
tags: dict[str, str] | None = None,
**kwargs
):
super().__init__(**kwargs)
self.json = json or {}
if name is not None:
self.json["name"] = name
if tags is not None:
self.json["tags"] = tags
def execute(context: Context) -> None:
pass
@dag(start_date=datetime(2023, 1, 1), schedule=None)
def derived_template_fields():
@task
def push_json() -> dict[str, str]:
return {"key1": "val1", "key2": "val2"}
json = push_json()
DatabricksCreateJobsOperator(
task_id="create_job_w_json", json=json, name="some_name", tags={"key3": "value3"}
)
derived_template_fields()DAG parsing fails with:
Running: airflow dags reserialize
[2023-08-31T14:29:57.796+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:29:57.816+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:29:58.533+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:29:58.615+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/usr/local/airflow/dags/derived_template_fields.py", line 50, in <module>
derived_template_fields()
File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
f(**f_kwargs)
File "/usr/local/airflow/dags/derived_template_fields.py", line 41, in derived_template_fields
DatabricksCreateJobsOperator(
File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
self.json["name"] = name
~~~~~~~~~^^^^^^^^
TypeError: 'PlainXComArg' object does not support item assignment
Even if we change the json arg assignment to use the classic XCom Jinja template approach (i.e. json = "{{ ti.xcom_pull(task_ids='push_json') }}"), the DAG fails to parse:
Running: airflow dags reserialize
[2023-08-31T14:32:01.553+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:32:01.574+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:32:02.341+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:32:02.415+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/usr/local/airflow/dags/derived_template_fields.py", line 51, in <module>
derived_template_fields()
File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
f(**f_kwargs)
File "/usr/local/airflow/dags/derived_template_fields.py", line 42, in derived_template_fields
DatabricksCreateJobsOperator(
File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
self.json["name"] = name
~~~~~~~~~^^^^^^^^
TypeError: 'str' object does not support item assignment
Perhaps it's possible users haven't needed a use case for predefining a json arg from a previous task, Airflow Variable, DAG Param, etc. (accessed by a Jinja templates).
Seems like there is now some movement on addressing #29069 to prevent this in the future too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@josh-fell well spotted, that's a relevant concern! Since there is a separate ticket to address this issue, would it be okay if we consider this change outside of the scope of the current PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally fair. All of the Databricks operators would need to be cleaned up, but I also haven't heard of anyone running into this issue within their use cases.
I can log a separate issue and maybe we can get some folks to contribute! It's a solid "good first issue".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great, thanks a lot, @josh-fell! Please share the ticket once you create it - it's valuable and relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the mighty delay here, just got around to logging the issue.
|
Hey @stikkireddy, you've come a long way on this - is there any chance you could resolve the conflicts & have the CI tests pass, it feels like we're super close to merging this feature. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, but I thought type annotation was enough? not a blocker, though
generated/provider_dependencies.json
Outdated
| "aiohttp>=3.6.3, <4", | ||
| "apache-airflow-providers-common-sql>=1.5.0", | ||
| "apache-airflow>=2.4.0", | ||
| "databricks-sdk>=0.1.11, <1.0.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we need/want to pin it to <1.0.0?
Lee-W
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few questions but they're non-blockers. Also we might need to fix the CI failures.
…ricks-jobs-create # Conflicts: # generated/provider_dependencies.json
…fic version ==0.10.0
…fic version ==0.10.0
| # The 2.9.1 (to be released soon) already contains the fix | ||
| - databricks-sql-connector>=2.0.0, <3.0.0, !=2.9.0 | ||
| - aiohttp>=3.6.3, <4 | ||
| - databricks-sdk==0.10.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why limit to specific version? we rarely do this
I also don't understand if this is dependency or extra dependency ?
| - name: sdk | ||
| description: Install Databricks SDK | ||
| dependencies: | ||
| - databricks-sdk==0.10.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question
|
Wondering why the job |
|
Should be rebased now. The issue should be fixed with 3 PRs I did during the last few days. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
Add the DatabricksJobsCreateOperator for use cases where the DatabricksSubmitRunOperator is insufficient.
closes: #29733
Continuation of @kyle-winkelman 's work in #29790.