diff --git a/airflow/providers/facebook/ads/hooks/ads.py b/airflow/providers/facebook/ads/hooks/ads.py index 63b702c94bc6d..e2e06ec010a02 100644 --- a/airflow/providers/facebook/ads/hooks/ads.py +++ b/airflow/providers/facebook/ads/hooks/ads.py @@ -74,10 +74,6 @@ def __init__( def _get_service(self) -> FacebookAdsApi: """ Returns Facebook Ads Client using a service account""" config = self.facebook_ads_config - missings = [_each for _each in self.client_required_fields if _each not in config] - if missings: - message = "{missings} fields are missing".format(missings=missings) - raise AirflowException(message) return FacebookAdsApi.init(app_id=config["app_id"], app_secret=config["app_secret"], access_token=config["access_token"], @@ -85,16 +81,19 @@ def _get_service(self) -> FacebookAdsApi: api_version=self.api_version) @cached_property - def facebook_ads_config(self) -> None: + def facebook_ads_config(self) -> Dict: """ Gets Facebook ads connection from meta db and sets facebook_ads_config attribute with returned config file """ self.log.info("Fetching fb connection: %s", self.facebook_conn_id) conn = self.get_connection(self.facebook_conn_id) - if "facebook_ads_client" not in conn.extra_dejson: - raise AirflowException("facebook_ads_client not found") - return conn.extra_dejson["facebook_ads_client"] + config = conn.extra_dejson + missings_keys = self.client_required_fields - config.keys() + if missings_keys: + message = "{missings_keys} fields are missing".format(missings_keys=missings_keys) + raise AirflowException(message) + return config def bulk_facebook_report( self, diff --git a/airflow/providers/google/facebook_ads_to_gcs/example_dags/example_ads.py b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py similarity index 74% rename from airflow/providers/google/facebook_ads_to_gcs/example_dags/example_ads.py rename to airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py index f617cb4c7ae4d..b23d1559cbf3a 100644 --- a/airflow/providers/google/facebook_ads_to_gcs/example_dags/example_ads.py +++ b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py @@ -24,10 +24,12 @@ from airflow import models from airflow.providers.google.cloud.operators.bigquery import ( - BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryExecuteQueryOperator, + BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator, + BigQueryExecuteQueryOperator, ) +from airflow.providers.google.cloud.operators.facebook_ads_to_gcs import FacebookAdsReportToGcsOperator +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator -from airflow.providers.google.facebook_ads_to_gcs.operators.ads import FacebookAdsReportToGcsOperator from airflow.utils.dates import days_ago # [START howto_GCS_env_variables] @@ -56,13 +58,21 @@ default_args = {"start_date": days_ago(1)} with models.DAG( - "example_fb_operator", + "example_facebook_ads_to_gcs", default_args=default_args, schedule_interval=None, # Override to match your needs ) as dag: - create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", - dataset_id=DATASET_NAME) + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=GCS_BUCKET, + project_id=GCP_PROJECT_ID, + ) + + create_dataset = BigQueryCreateEmptyDatasetOperator( + task_id="create_dataset", + dataset_id=DATASET_NAME, + ) create_table = BigQueryCreateEmptyTableOperator( task_id="create_table", @@ -77,7 +87,7 @@ ], ) - # [START howto_FB_ADS_to_gcs_operator] + # [START howto_operator_facebook_ads_to_gcs] run_operator = FacebookAdsReportToGcsOperator( task_id='run_fetch_data', start_date=days_ago(2), @@ -88,23 +98,33 @@ gcp_conn_id=GCS_CONN_ID, object_name=GCS_OBJ_PATH, ) - # [END howto_FB_ADS_to_gcs_operator] + # [END howto_operator_facebook_ads_to_gcs] - # [START howto_operator_gcs_to_bq] load_csv = GCSToBigQueryOperator( task_id='gcs_to_bq_example', bucket=GCS_BUCKET, source_objects=[GCS_OBJ_PATH], destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}", - write_disposition='WRITE_TRUNCATE') - # [END howto_operator_gcs_to_bq] + write_disposition='WRITE_TRUNCATE' + ) - # [START howto_operator_read_data_from_gcs] read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator( task_id="read_data_from_gcs_many_chunks", sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`", use_legacy_sql=False, ) - # [END howto_operator_read_data_from_gcs] - create_dataset >> create_table >> run_operator >> load_csv >> read_data_from_gcs_many_chunks + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", + bucket_name=GCS_BUCKET, + ) + + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + project_id=GCP_PROJECT_ID, + dataset_id=DATASET_NAME, + delete_contents=True, + ) + + create_bucket >> create_dataset >> create_table >> run_operator >> load_csv + load_csv >> read_data_from_gcs_many_chunks >> delete_bucket >> delete_dataset diff --git a/airflow/providers/google/facebook_ads_to_gcs/operators/ads.py b/airflow/providers/google/cloud/operators/facebook_ads_to_gcs.py similarity index 96% rename from airflow/providers/google/facebook_ads_to_gcs/operators/ads.py rename to airflow/providers/google/cloud/operators/facebook_ads_to_gcs.py index 57e5f3f06e1e3..0b9140ec2f36c 100644 --- a/airflow/providers/google/facebook_ads_to_gcs/operators/ads.py +++ b/airflow/providers/google/cloud/operators/facebook_ads_to_gcs.py @@ -42,6 +42,10 @@ class FacebookAdsReportToGcsOperator(BaseOperator): For more information on the Facebook Ads Python SDK, take a look at the docs: https://github.com/facebook/facebook-python-business-sdk + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:FacebookAdsReportToGcsOperator` + :param bucket: The GCS bucket to upload to :type bucket: str :param obj: GCS path to save the object. Must be the full file path (ex. `path/to/file.txt`) diff --git a/airflow/providers/google/facebook_ads_to_gcs/__init__.py b/airflow/providers/google/facebook_ads_to_gcs/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/airflow/providers/google/facebook_ads_to_gcs/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/providers/google/facebook_ads_to_gcs/example_dags/__init__.py b/airflow/providers/google/facebook_ads_to_gcs/example_dags/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/airflow/providers/google/facebook_ads_to_gcs/example_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/providers/google/facebook_ads_to_gcs/operators/__init__.py b/airflow/providers/google/facebook_ads_to_gcs/operators/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/airflow/providers/google/facebook_ads_to_gcs/operators/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/utils/db.py b/airflow/utils/db.py index f7a9f4a9351da..9e5f87068d867 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -245,15 +245,12 @@ def create_default_connections(session=None): Connection( conn_id="facebook_default", conn_type="facebook_social", - schema=""" - { - "facebook_ads_client": { - "account_id": "act_123456789", - "app_id": "1234567890", - "app_secret": "1f45tghxxxx12345", - "access_token": "ABcdEfghiJKlmnoxxyz" + extra=""" + { "account_id": "", + "app_id": "", + "app_secret": "", + "access_token": "" } - } """, ), session diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst index 57e1f5c56e66b..d3513b7133e3a 100644 --- a/docs/autoapi_templates/index.rst +++ b/docs/autoapi_templates/index.rst @@ -110,8 +110,6 @@ All operators are in the following packages: airflow/providers/exasol/operators/index - airflow/providers/google/facebook_ads_to_gcs/operators/index - airflow/providers/ftp/sensors/index airflow/providers/google/ads/operators/index diff --git a/docs/howto/operator/gcp/facebook_ads_to_gcs.rst b/docs/howto/operator/gcp/facebook_ads_to_gcs.rst new file mode 100644 index 0000000000000..a686878c65cdd --- /dev/null +++ b/docs/howto/operator/gcp/facebook_ads_to_gcs.rst @@ -0,0 +1,52 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Facebook Ads To GCS Operators +============================== + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include:: _partials/prerequisite_tasks.rst + +.. _howto/operator:FacebookAdsReportToGcsOperator: + +FacebookAdsReportToGcsOperator +------------------------------ + +Use the +:class:`~airflow.providers.google.cloud.operators.facebook_ads_to_gcs.FacebookAdsReportToGcsOperator` +to execute a Facebook ads report fetch and load to GCS. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py + :language: python + :start-after: [START howto_operator_facebook_ads_to_gcs] + :end-before: [END howto_operator_facebook_ads_to_gcs] + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Client Library Documentation `__ +* `Product Documentation `__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index c6de4160bf283..f17daad0233f6 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -806,8 +806,8 @@ These integrations allow you to copy data from/to Google Cloud Platform. * - `Facebook Ads `__ - `Google Cloud Storage (GCS) `__ - - - - :mod:`airflow.providers.google.facebook_ads_to_gcs.operators.ads` + - :doc:`How to use ` + - :mod:`airflow.providers.google.cloud.operators.facebook_ads_to_gcs` * - `Google BigQuery `__ - `MySQL `__ diff --git a/tests/providers/facebook/ads/hooks/test_ads.py b/tests/providers/facebook/ads/hooks/test_ads.py index 7d975d41bf679..25f63ae3221fd 100644 --- a/tests/providers/facebook/ads/hooks/test_ads.py +++ b/tests/providers/facebook/ads/hooks/test_ads.py @@ -22,12 +22,10 @@ API_VERSION = "api_version" EXTRAS = { - "facebook_ads_client": { - "account_id": "act_12345", - "app_id": "12345", - "app_secret": "1fg444", - "access_token": "Ab35gf7E" - } + "account_id": "act_12345", + "app_id": "12345", + "app_secret": "1fg444", + "access_token": "Ab35gf7E" } FIELDS = [ "campaign_name", @@ -55,11 +53,10 @@ class TestFacebookAdsReportingHook: def test_get_service(self, mock_api, mock_hook): mock_hook._get_service() api = mock_api.init - creds = EXTRAS["facebook_ads_client"] - api.assert_called_once_with(app_id=creds["app_id"], - app_secret=creds["app_secret"], - access_token=creds["access_token"], - account_id=creds["account_id"], + api.assert_called_once_with(app_id=EXTRAS["app_id"], + app_secret=EXTRAS["app_secret"], + access_token=EXTRAS["access_token"], + account_id=EXTRAS["account_id"], api_version=API_VERSION) @mock.patch("airflow.providers.facebook.ads.hooks.ads.AdAccount") diff --git a/tests/providers/google/facebook_ads_to_gcs/operators/test_ads.py b/tests/providers/google/cloud/operators/test_facebook_ads_to_gcs.py similarity index 90% rename from tests/providers/google/facebook_ads_to_gcs/operators/test_ads.py rename to tests/providers/google/cloud/operators/test_facebook_ads_to_gcs.py index 2406d5712f238..a2b8b6b9dade5 100644 --- a/tests/providers/google/facebook_ads_to_gcs/operators/test_ads.py +++ b/tests/providers/google/cloud/operators/test_facebook_ads_to_gcs.py @@ -16,7 +16,7 @@ # under the License. from unittest import mock -from airflow.providers.google.facebook_ads_to_gcs.operators.ads import FacebookAdsReportToGcsOperator +from airflow.providers.google.cloud.operators.facebook_ads_to_gcs import FacebookAdsReportToGcsOperator GCS_BUCKET = "airflow_bucket_fb" GCS_OBJ_PATH = "Temp/this_is_my_report_json.json" @@ -47,8 +47,8 @@ class TestFacebookAdsReportToGcsOperator: - @mock.patch("airflow.providers.google.facebook_ads_to_gcs.operators.ads.FacebookAdsReportingHook") - @mock.patch("airflow.providers.google.facebook_ads_to_gcs.operators.ads.GCSHook") + @mock.patch("airflow.providers.google.cloud.operators.facebook_ads_to_gcs.FacebookAdsReportingHook") + @mock.patch("airflow.providers.google.cloud.operators.facebook_ads_to_gcs.GCSHook") def test_execute(self, mock_gcs_hook, mock_ads_hook): mock_ads_hook.return_value.bulk_facebook_report.return_value = FACEBOOK_RETURN_VALUE op = FacebookAdsReportToGcsOperator(facebook_conn_id=FACEBOOK_ADS_CONN_ID, diff --git a/tests/providers/google/cloud/operators/test_facebook_ads_to_gcs_system.py b/tests/providers/google/cloud/operators/test_facebook_ads_to_gcs_system.py new file mode 100644 index 0000000000000..bf107e6e6e985 --- /dev/null +++ b/tests/providers/google/cloud/operators/test_facebook_ads_to_gcs_system.py @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import os +from contextlib import contextmanager + +import pytest + +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.utils.process_utils import patch_environ +from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY +from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context + +CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys') +FACEBOOK_KEY = 'facebook.json' +FACEBOOK_CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, FACEBOOK_KEY) +CONNECTION_TYPE = os.environ.get('CONNECTION_TYPE', 'facebook_social') +FACEBOOK_CONNECTION_ID = os.environ.get('FACEBOOK_CONNECTION_ID', 'facebook_default') +CONFIG_REQUIRED_FIELDS = ["app_id", + "app_secret", + "access_token", + "account_id"] + + +@contextmanager +def provide_facebook_connection( + key_file_path: str +): + """ + Context manager that provides a temporary value of AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT + connection. It build a new connection that includes path to provided service json, + required scopes and project id. + + :param key_file_path: Path to file with FACEBOOK credentials .json file. + :type key_file_path: str + """ + if not key_file_path.endswith(".json"): + raise AirflowException( + "Use a JSON key file." + ) + with open(key_file_path, 'r') as credentials: + creds = json.load(credentials) + missings_keys = CONFIG_REQUIRED_FIELDS - creds.keys() + if missings_keys: + message = "{missings_keys} fields are missing".format(missings_keys=missings_keys) + raise AirflowException(message) + conn = Connection( + conn_id=FACEBOOK_CONNECTION_ID, + conn_type=CONNECTION_TYPE, + extra=json.dumps(creds) + ) + with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}): + yield + + +@pytest.mark.credential_file(FACEBOOK_KEY) +@pytest.mark.credential_file(GCP_BIGQUERY_KEY) +@pytest.mark.system("google.cloud") +class FacebookAdsToGcsExampleDagsSystemTest(GoogleSystemTest): + + @provide_gcp_context(GCP_BIGQUERY_KEY) + @provide_facebook_connection(FACEBOOK_CREDENTIALS_PATH) + def test_dag_example(self): + self.run_dag("example_facebook_ads_to_gcs", CLOUD_DAG_FOLDER) diff --git a/tests/providers/google/facebook_ads_to_gcs/__init__.py b/tests/providers/google/facebook_ads_to_gcs/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/tests/providers/google/facebook_ads_to_gcs/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/tests/providers/google/facebook_ads_to_gcs/operators/__init__.py b/tests/providers/google/facebook_ads_to_gcs/operators/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/tests/providers/google/facebook_ads_to_gcs/operators/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License.