-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new SQLCheckOperators #23915
Add new SQLCheckOperators #23915
Conversation
This commit adds two new SQL Check Operators to the core sql.py file, one called SQLColumnCheckOperator and one called SQLTableCheckOperator. Corresponding tests are added as well. The current functionality of these operators is minimal, but expected to grow over time. These operators, unlike current ones, save information about the results of the check queries that is then parseable by third party libraries.
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Accidentally copied the last line of the file several times, this commit remedies that change.
Question I have for reviewers: would it make sense to add the Snowflake, Bigquery, Redshift, etc... versions of this operator in with this PR, or open a separate issue & PR for the provider updates? |
My vote would be for separate ones. |
Improved the column check operator to include support for greater than, geq, less than, and leq functions. Updated logic and tests to reflect this change.
Improved the table check operator by refactoring how checks are performed; this makes it much more customizeable at the expense of knowing the specific result of checks, as the results are now all just 0 or 1.
Definitely separate. Also to note - those classes will not be available in Airlfow's core until 2.4 release and our providers are 2.1 (and soon 2.2-compatible). So when you add it to providers, it has to be done in the way that the proivders are importable in 2.2 as well and that those new opertors are treated as optional and provide the right message when someone tries to use the operator in Airflow 2.2/2.3 |
airflow/operators/sql.py
Outdated
|
||
column_checks = { | ||
# pass value should be number of acceptable nulls | ||
"null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS column_null_check", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS column_null_check", | |
"null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check", |
Should be checking for the column value rather than the string of the the column name I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ummm... good question. I'm not 100% sure. Testing on Snowflake, the lack of '
version had a parse error iirc. So it might be a dialect issue that can be resolved in the provider versions.
|
||
self.table = table | ||
self.column_mapping = column_mapping | ||
self.sql = f"SELECT * FROM {self.table};" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why is this needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For OpenLineage. The Extractor needs some static query to render, unless we want to have the entire templated query rendered in __init__()
. I can double-check this, though, it might have changed with the move to the Listener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to reiterate - those "common" SQL classes will not be released unti Airflow 2.4 and it means that the first operator tha will be able to use them as "mandatory" will be 12 months after 2.4 is released (that's our policy for providers), so I just want to make you aware @denimalpaca that those classes won't be to useful for quite some time.
Maybe a better Idea will be to add a separate "base sql" provider which will not be in the "airflow/operators" but it will be a separate package that could be installed additionally to airflow. We have not done so before, but maybe there is a good reason to introduce such a "shared sql" provider that wil encapsulate all SQL-like base functionality that open lineage might base on? I think trying to implement it in Airflow core is a bad idea, if the goal is fast adoption by multiple providers.
Just saying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better Idea will be to add a separate "base sql" provider
I think this is an interesting idea and I support it. We could start with these operators and slowly move existing core SQL operators over too.
@potiuk Do you think this provider would be included with Airflow installs like HTTP, FTP, SQLite, and IMAP, or completely separate?
@@ -385,6 +387,82 @@ def test_fail_min_sql_max_value(self, mock_get_db_hook): | |||
operator.execute() | |||
|
|||
|
|||
class TestColumnCheckOperator(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this can be a pytest
test rather than continuing to use unittest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to use one over the other? I was just copying the test pattern from the other ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer pytest test type unless there are good reasons not to (for new test)
def _construct_operator(self, column_mapping): | ||
return SQLColumnCheckOperator(task_id="test_task", table="test_table", column_mapping=column_mapping) | ||
|
||
@mock.patch.object(SQLColumnCheckOperator, "get_db_hook") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since every test case uses the same mocked object, you could think about using a pytest fixture for mocking get_db_hook
. I don't have a strong opinion though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use pytest fixtures, the monkeypatch
fixture makes this easy to do in setup_method
.
New docstrings were using example dictionaries in the param list, and the way they were written before caused doc build errors. The fix was to update the formatting and add missing words to the word list.
Moves record logging below the AirflowException for no records, and makes the Exception message more clear.
Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com>
SQLTableCheckOperator was rendering an invalid SQL query, this fix changes the way the query template is built from the passed in strings.
After some research, it seems the null_check query does *not* need quotes around the column name.
Closing this PR in favor of #24476 |
This pull request adds two new SQL Check Operators to the core sql.py file, one called SQLColumnCheckOperator and one called SQLTableCheckOperator. Corresponding tests are added as well. The current functionality of these operators is minimal, but expected to grow over time. These operators, unlike current ones, save information about the results
of the check queries that is then parseable by third party libraries.
closes: #23874