From 61cdf10c76fbda35e93b80bf1e810b7d5d6dcff9 Mon Sep 17 00:00:00 2001 From: Bora Berke Sahin <67373739+boraberke@users.noreply.github.com> Date: Wed, 12 Jun 2024 17:43:25 +0300 Subject: [PATCH] Resolve deprecations in `BigQuery` operators (#40182) --- tests/deprecations_ignore.yml | 12 - .../google/cloud/operators/test_bigquery.py | 379 +++++++++--------- 2 files changed, 197 insertions(+), 194 deletions(-) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index bed0c8dcbf822d..a8debe085b2511 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -191,19 +191,7 @@ - tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_delete_pipeline_job - tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_get_pipeline_job - tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_list_pipeline_jobs -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryCreateExternalTableOperator::test_execute_with_csv_format - tests/providers/google/cloud/operators/test_automl.py::TestAutoMLTrainModelOperator::test_execute -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryCreateExternalTableOperator::test_execute_with_parquet_format -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_defaults -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_missing_job_id -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_multiple_query -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_single_query -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_serialized_field_when_multiple_queries -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_serialized_field_when_single_query -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute_bad_type -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute_list -- tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryPatchDatasetOperator::test_execute - tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_job_not_running_exec - tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_job_running_exec - tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_multiple_job_exec diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 3fa34467610e4c..6a92fe8089f596 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -29,7 +29,13 @@ from openlineage.client.facet import ErrorMessageRunFacet, ExternalQueryRunFacet, SqlJobFacet from openlineage.client.run import Dataset -from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout, TaskDeferred +from airflow.exceptions import ( + AirflowException, + AirflowProviderDeprecationWarning, + AirflowSkipException, + AirflowTaskTimeout, + TaskDeferred, +) from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCheckOperator, BigQueryColumnCheckOperator, @@ -249,15 +255,38 @@ def test_create_existing_table(self, mock_hook, caplog, if_exists, is_conflict, class TestBigQueryCreateExternalTableOperator: @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_execute_with_csv_format(self, mock_hook): + table_resource = { + "tableReference": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + "labels": None, + "schema": {"fields": []}, + "externalDataConfiguration": { + "source_uris": [ + f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_CSV_DATA + ], + "source_format": TEST_SOURCE_CSV_FORMAT, + "maxBadRecords": 0, + "autodetect": True, + "compression": "NONE", + "csvOptions": { + "fieldDelimiter": ",", + "skipLeadingRows": 0, + "quote": None, + "allowQuotedNewlines": False, + "allowJaggedRows": False, + }, + }, + "location": None, + "encryptionConfiguration": None, + } operator = BigQueryCreateExternalTableOperator( task_id=TASK_ID, - destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}", - schema_fields=[], bucket=TEST_GCS_BUCKET, - gcs_schema_bucket=TEST_GCS_BUCKET, source_objects=TEST_GCS_CSV_DATA, - source_format=TEST_SOURCE_CSV_FORMAT, - autodetect=True, + table_resource=table_resource, ) mock_hook.return_value.split_tablename.return_value = ( @@ -267,47 +296,35 @@ def test_execute_with_csv_format(self, mock_hook): ) operator.execute(context=MagicMock()) - mock_hook.return_value.create_empty_table.assert_called_once_with( - table_resource={ - "tableReference": { - "projectId": TEST_GCP_PROJECT_ID, - "datasetId": TEST_DATASET, - "tableId": TEST_TABLE_ID, - }, - "labels": None, - "schema": {"fields": []}, - "externalDataConfiguration": { - "source_uris": [ - f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_CSV_DATA - ], - "source_format": TEST_SOURCE_CSV_FORMAT, - "maxBadRecords": 0, - "autodetect": True, - "compression": "NONE", - "csvOptions": { - "fieldDelimiter": ",", - "skipLeadingRows": 0, - "quote": None, - "allowQuotedNewlines": False, - "allowJaggedRows": False, - }, - }, - "location": None, - "encryptionConfiguration": None, - } - ) + mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource) @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_execute_with_parquet_format(self, mock_hook): + table_resource = { + "tableReference": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + "labels": None, + "schema": {"fields": []}, + "externalDataConfiguration": { + "source_uris": [ + f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_PARQUET_DATA + ], + "source_format": TEST_SOURCE_PARQUET_FORMAT, + "maxBadRecords": 0, + "autodetect": True, + "compression": "NONE", + }, + "location": None, + "encryptionConfiguration": None, + } operator = BigQueryCreateExternalTableOperator( task_id=TASK_ID, - destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}", - schema_fields=[], bucket=TEST_GCS_BUCKET, - gcs_schema_bucket=TEST_GCS_BUCKET, source_objects=TEST_GCS_PARQUET_DATA, - source_format=TEST_SOURCE_PARQUET_FORMAT, - autodetect=True, + table_resource=table_resource, ) mock_hook.return_value.split_tablename.return_value = ( @@ -317,28 +334,7 @@ def test_execute_with_parquet_format(self, mock_hook): ) operator.execute(context=MagicMock()) - mock_hook.return_value.create_empty_table.assert_called_once_with( - table_resource={ - "tableReference": { - "projectId": TEST_GCP_PROJECT_ID, - "datasetId": TEST_DATASET, - "tableId": TEST_TABLE_ID, - }, - "labels": None, - "schema": {"fields": []}, - "externalDataConfiguration": { - "source_uris": [ - f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_PARQUET_DATA - ], - "source_format": TEST_SOURCE_PARQUET_FORMAT, - "maxBadRecords": 0, - "autodetect": True, - "compression": "NONE", - }, - "location": None, - "encryptionConfiguration": None, - } - ) + mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource) class TestBigQueryDeleteDatasetOperator: @@ -478,12 +474,17 @@ class TestBigQueryPatchDatasetOperator: @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_execute(self, mock_hook): dataset_resource = {"friendlyName": "Test DS"} - operator = BigQueryPatchDatasetOperator( - dataset_resource=dataset_resource, - task_id=TASK_ID, - dataset_id=TEST_DATASET, - project_id=TEST_GCP_PROJECT_ID, - ) + deprecation_message = ( + r"Call to deprecated class BigQueryPatchDatasetOperator\. " + r"\(This operator is deprecated\. Please use BigQueryUpdateDatasetOperator\.\)" + ) + with pytest.warns(AirflowProviderDeprecationWarning, match=deprecation_message): + operator = BigQueryPatchDatasetOperator( + dataset_resource=dataset_resource, + task_id=TASK_ID, + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID, + ) operator.execute(None) mock_hook.return_value.patch_dataset.assert_called_once_with( @@ -513,6 +514,11 @@ def test_execute(self, mock_hook): @pytest.mark.db_test class TestBigQueryOperator: + deprecation_message = ( + r"Call to deprecated class BigQueryExecuteQueryOperator\. " + r"\(This operator is deprecated\. Please use `BigQueryInsertJobOperator`\.\)" + ) + def teardown_method(self): clear_db_xcom() clear_db_runs() @@ -523,33 +529,34 @@ def teardown_method(self): def test_execute(self, mock_hook): encryption_configuration = {"key": "kk"} - operator = BigQueryExecuteQueryOperator( - task_id=TASK_ID, - sql="Select * from test_table", - destination_dataset_table=None, - write_disposition="WRITE_EMPTY", - allow_large_results=False, - flatten_results=None, - gcp_conn_id="google_cloud_default", - udf_config=None, - use_legacy_sql=True, - maximum_billing_tier=None, - maximum_bytes_billed=None, - create_disposition="CREATE_IF_NEEDED", - schema_update_options=(), - query_params=None, - labels=None, - priority="INTERACTIVE", - time_partitioning=None, - api_resource_configs=None, - cluster_fields=None, - encryption_configuration=encryption_configuration, - impersonation_chain=["service-account@myproject.iam.gserviceaccount.com"], - impersonation_scopes=[ - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/drive", - ], - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + operator = BigQueryExecuteQueryOperator( + task_id=TASK_ID, + sql="Select * from test_table", + destination_dataset_table=None, + write_disposition="WRITE_EMPTY", + allow_large_results=False, + flatten_results=None, + gcp_conn_id="google_cloud_default", + udf_config=None, + use_legacy_sql=True, + maximum_billing_tier=None, + maximum_bytes_billed=None, + create_disposition="CREATE_IF_NEEDED", + schema_update_options=(), + query_params=None, + labels=None, + priority="INTERACTIVE", + time_partitioning=None, + api_resource_configs=None, + cluster_fields=None, + encryption_configuration=encryption_configuration, + impersonation_chain=["service-account@myproject.iam.gserviceaccount.com"], + impersonation_scopes=[ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/drive", + ], + ) operator.execute(MagicMock()) mock_hook.assert_called_with( @@ -584,31 +591,32 @@ def test_execute(self, mock_hook): @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_execute_list(self, mock_hook): - operator = BigQueryExecuteQueryOperator( - task_id=TASK_ID, - sql=[ - "Select * from test_table", - "Select * from other_test_table", - ], - destination_dataset_table=None, - write_disposition="WRITE_EMPTY", - allow_large_results=False, - flatten_results=None, - gcp_conn_id="google_cloud_default", - udf_config=None, - use_legacy_sql=True, - maximum_billing_tier=None, - maximum_bytes_billed=None, - create_disposition="CREATE_IF_NEEDED", - schema_update_options=(), - query_params=None, - labels=None, - priority="INTERACTIVE", - time_partitioning=None, - api_resource_configs=None, - cluster_fields=None, - encryption_configuration=None, - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + operator = BigQueryExecuteQueryOperator( + task_id=TASK_ID, + sql=[ + "Select * from test_table", + "Select * from other_test_table", + ], + destination_dataset_table=None, + write_disposition="WRITE_EMPTY", + allow_large_results=False, + flatten_results=None, + gcp_conn_id="google_cloud_default", + udf_config=None, + use_legacy_sql=True, + maximum_billing_tier=None, + maximum_bytes_billed=None, + create_disposition="CREATE_IF_NEEDED", + schema_update_options=(), + query_params=None, + labels=None, + priority="INTERACTIVE", + time_partitioning=None, + api_resource_configs=None, + cluster_fields=None, + encryption_configuration=None, + ) operator.execute(MagicMock()) mock_hook.return_value.run_query.assert_has_calls( @@ -656,40 +664,42 @@ def test_execute_list(self, mock_hook): @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_execute_bad_type(self, mock_hook): - operator = BigQueryExecuteQueryOperator( - task_id=TASK_ID, - sql=1, - destination_dataset_table=None, - write_disposition="WRITE_EMPTY", - allow_large_results=False, - flatten_results=None, - gcp_conn_id="google_cloud_default", - udf_config=None, - use_legacy_sql=True, - maximum_billing_tier=None, - maximum_bytes_billed=None, - create_disposition="CREATE_IF_NEEDED", - schema_update_options=(), - query_params=None, - labels=None, - priority="INTERACTIVE", - time_partitioning=None, - api_resource_configs=None, - cluster_fields=None, - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + operator = BigQueryExecuteQueryOperator( + task_id=TASK_ID, + sql=1, + destination_dataset_table=None, + write_disposition="WRITE_EMPTY", + allow_large_results=False, + flatten_results=None, + gcp_conn_id="google_cloud_default", + udf_config=None, + use_legacy_sql=True, + maximum_billing_tier=None, + maximum_bytes_billed=None, + create_disposition="CREATE_IF_NEEDED", + schema_update_options=(), + query_params=None, + labels=None, + priority="INTERACTIVE", + time_partitioning=None, + api_resource_configs=None, + cluster_fields=None, + ) with pytest.raises(AirflowException): operator.execute(MagicMock()) @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_operator_defaults(self, mock_hook, create_task_instance_of_operator): - ti = create_task_instance_of_operator( - BigQueryExecuteQueryOperator, - dag_id=TEST_DAG_ID, - task_id=TASK_ID, - sql="Select * from test_table", - schema_update_options=None, - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + ti = create_task_instance_of_operator( + BigQueryExecuteQueryOperator, + dag_id=TEST_DAG_ID, + task_id=TASK_ID, + sql="Select * from test_table", + schema_update_options=None, + ) operator = ti.task operator.execute(MagicMock()) @@ -722,13 +732,14 @@ def test_bigquery_operator_extra_serialized_field_when_single_query( dag_maker, create_task_instance_of_operator, ): - ti = create_task_instance_of_operator( - BigQueryExecuteQueryOperator, - dag_id=TEST_DAG_ID, - execution_date=DEFAULT_DATE, - task_id=TASK_ID, - sql="SELECT * FROM test_table", - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + ti = create_task_instance_of_operator( + BigQueryExecuteQueryOperator, + dag_id=TEST_DAG_ID, + execution_date=DEFAULT_DATE, + task_id=TASK_ID, + sql="SELECT * FROM test_table", + ) serialized_dag = dag_maker.get_serialized_data() deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) assert hasattr(deserialized_dag.tasks[0], "sql") @@ -757,13 +768,14 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries( dag_maker, create_task_instance_of_operator, ): - ti = create_task_instance_of_operator( - BigQueryExecuteQueryOperator, - dag_id=TEST_DAG_ID, - execution_date=DEFAULT_DATE, - task_id=TASK_ID, - sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"], - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + ti = create_task_instance_of_operator( + BigQueryExecuteQueryOperator, + dag_id=TEST_DAG_ID, + execution_date=DEFAULT_DATE, + task_id=TASK_ID, + sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"], + ) serialized_dag = dag_maker.get_serialized_data() deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"]) assert hasattr(deserialized_dag.tasks[0], "sql") @@ -801,12 +813,13 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries( def test_bigquery_operator_extra_link_when_missing_job_id( self, mock_hook, create_task_instance_of_operator ): - ti = create_task_instance_of_operator( - BigQueryExecuteQueryOperator, - dag_id=TEST_DAG_ID, - task_id=TASK_ID, - sql="SELECT * FROM test_table", - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + ti = create_task_instance_of_operator( + BigQueryExecuteQueryOperator, + dag_id=TEST_DAG_ID, + task_id=TASK_ID, + sql="SELECT * FROM test_table", + ) bigquery_task = ti.task assert "" == bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name) @@ -817,13 +830,14 @@ def test_bigquery_operator_extra_link_when_single_query( mock_hook, create_task_instance_of_operator, ): - ti = create_task_instance_of_operator( - BigQueryExecuteQueryOperator, - dag_id=TEST_DAG_ID, - execution_date=DEFAULT_DATE, - task_id=TASK_ID, - sql="SELECT * FROM test_table", - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + ti = create_task_instance_of_operator( + BigQueryExecuteQueryOperator, + dag_id=TEST_DAG_ID, + execution_date=DEFAULT_DATE, + task_id=TASK_ID, + sql="SELECT * FROM test_table", + ) bigquery_task = ti.task ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID) @@ -837,13 +851,14 @@ def test_bigquery_operator_extra_link_when_single_query( def test_bigquery_operator_extra_link_when_multiple_query( self, mock_hook, create_task_instance_of_operator ): - ti = create_task_instance_of_operator( - BigQueryExecuteQueryOperator, - dag_id=TEST_DAG_ID, - execution_date=DEFAULT_DATE, - task_id=TASK_ID, - sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"], - ) + with pytest.warns(AirflowProviderDeprecationWarning, match=self.deprecation_message): + ti = create_task_instance_of_operator( + BigQueryExecuteQueryOperator, + dag_id=TEST_DAG_ID, + execution_date=DEFAULT_DATE, + task_id=TASK_ID, + sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"], + ) bigquery_task = ti.task ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])