Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
from airflow.providers.google.common.hooks.base_google import get_field

if TYPE_CHECKING:
from google.ads.googleads.v19.services.services.customer_service import CustomerServiceClient
from google.ads.googleads.v19.services.services.google_ads_service import GoogleAdsServiceClient
from google.ads.googleads.v19.services.services.google_ads_service.pagers import SearchPager
from google.ads.googleads.v19.services.types.google_ads_service import GoogleAdsRow
from google.ads.googleads.v20.services.services.customer_service import CustomerServiceClient
from google.ads.googleads.v20.services.services.google_ads_service import GoogleAdsServiceClient
from google.ads.googleads.v20.services.services.google_ads_service.pagers import SearchPager
from google.ads.googleads.v20.services.types.google_ads_service import GoogleAdsRow


class GoogleAdsHook(BaseHook):
Expand Down
151 changes: 139 additions & 12 deletions providers/google/tests/system/google/ads/example_ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,53 @@
# under the License.
"""
Example Airflow DAG that shows how to use GoogleAdsToGcsOperator.

In order to run this test, make sure you followed steps:
1. In your GCP project create a service account that will be used to operate on Google Ads.
The name should be in format `google-ads-service-account@{PROJECT_ID}.iam.gserviceaccount.com`
2. Generate a key for this service account and store it in the Secret Manager
under the name `google_ads_service_account_key`.
3. Give this service account Editor permissions.
4. Make sure Google Ads API is enabled in your GCP project.
5. Login to https://ads.google.com
6. In the Admin section go to Access and Security and give your GCP service account Admin permissions.
7. Store values of your developer token and client ID to Secret Manager under names `google_ads_client_id`
and `google_ads_developer_token`.
"""

from __future__ import annotations

import json
import logging
import os
from datetime import datetime

from google.cloud.exceptions import NotFound

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator
from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator
from airflow.providers.google.cloud.hooks.secret_manager import GoogleCloudSecretManagerHook
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.utils.trigger_rule import TriggerRule

from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
from tests_common.test_utils.api_client_helpers import create_airflow_connection, delete_airflow_connection

# [START howto_google_ads_env_variables]
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
API_VERSION = "v19"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_PROJECT_ID", "default")
API_VERSION = "v20"

DAG_ID = "example_google_ads"
DAG_ID = "google_ads"

GOOGLE_ADS_CLIENT_ID = "google_ads_client_id"
GOOGLE_ADS_SERVICE_ACCOUNT_KEY = "google_ads_service_account_key"
GOOGLE_ADS_DEVELOPER_TOKEN = "google_ads_developer_token"

BUCKET_NAME = f"bucket_ads_{ENV_ID}"
CLIENT_IDS = ["1111111111", "2222222222"]
GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv"
GCS_ACCOUNTS_CSV = "folder_name/accounts.csv"
GCS_OBJ_PATH = f"gs://{BUCKET_NAME}/google-ads-api-results.csv"
GCS_ACCOUNTS_CSV = "accounts.csv"
QUERY = """
SELECT
segments.date,
Expand All @@ -61,6 +82,9 @@
segments.date >= '2020-02-01'
AND segments.date <= '2020-02-29'
"""
CONNECTION_GLOUD_ID = f"connection_cloud_{DAG_ID}_{ENV_ID}"
CONNECTION_ADS_ID = "google_ads_default"
CONNECTION_TYPE = "google_cloud_platform"

FIELDS_TO_EXTRACT = [
"segments.date.value",
Expand All @@ -76,47 +100,150 @@
]
# [END howto_google_ads_env_variables]

log = logging.getLogger(__name__)


def get_secret(secret_id: str) -> str:
hook = GoogleCloudSecretManagerHook()
if hook.secret_exists(secret_id=secret_id):
return hook.access_secret(secret_id=secret_id).payload.data.decode()
raise NotFound("The secret '%s' not found", secret_id)


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "ads"],
render_template_as_native_obj=True,
) as dag:

@task
def get_google_ads_client_id():
return get_secret(secret_id=GOOGLE_ADS_CLIENT_ID).strip()

get_google_ads_client_id_task = get_google_ads_client_id()

@task
def get_google_ads_service_account_key():
return get_secret(secret_id=GOOGLE_ADS_SERVICE_ACCOUNT_KEY)

get_google_ads_service_account_key_task = get_google_ads_service_account_key()

@task
def get_google_ads_developer_token():
return get_secret(secret_id=GOOGLE_ADS_DEVELOPER_TOKEN).strip()

get_google_ads_developer_token_task = get_google_ads_developer_token()

@task
def create_connection_gcloud_for_ads(connection_id: str, key) -> None:
conn_extra_json = json.dumps(
{
"keyfile_dict": key,
"project": PROJECT_ID,
"scope": "https://www.googleapis.com/auth/adwords, https://www.googleapis.com/auth/cloud-platform",
}
)
create_airflow_connection(
connection_id=connection_id,
connection_conf={"conn_type": CONNECTION_TYPE, "extra": conn_extra_json},
)

create_connection_gcloud_for_ads = create_connection_gcloud_for_ads(
connection_id=CONNECTION_GLOUD_ID, key=get_google_ads_service_account_key_task
)

@task
def create_connection_ads(connection_id: str, token) -> None:
conn_extra_json = json.dumps(
{
"google_ads_client": {
"developer_token": token,
# this parameter is required to be not None, but the actual content will be overwritten, so can be some dummy string
"json_key_file_path": "some_string",
"impersonated_email": f"google-ads-service-account@{PROJECT_ID}.iam.gserviceaccount.com",
"use_proto_plus": False,
},
"project": PROJECT_ID,
"scope": "https://www.googleapis.com/auth/adwords, https://www.googleapis.com/auth/cloud-platform",
}
)
create_airflow_connection(
connection_id=connection_id,
connection_conf={"conn_type": CONNECTION_TYPE, "extra": conn_extra_json},
)

create_connection_ads_task = create_connection_ads(
connection_id=CONNECTION_ADS_ID, token=get_google_ads_developer_token_task
)

create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
task_id="create_bucket",
bucket_name=BUCKET_NAME,
project_id=PROJECT_ID,
gcp_conn_id=CONNECTION_GLOUD_ID,
)

# [START howto_google_ads_to_gcs_operator]
run_operator = GoogleAdsToGcsOperator(
client_ids=CLIENT_IDS,
client_ids=[get_google_ads_client_id_task],
query=QUERY,
attributes=FIELDS_TO_EXTRACT,
obj=GCS_OBJ_PATH,
bucket=BUCKET_NAME,
task_id="run_operator",
api_version=API_VERSION,
gcp_conn_id=CONNECTION_GLOUD_ID,
)
# [END howto_google_ads_to_gcs_operator]

# [START howto_ads_list_accounts_operator]
list_accounts = GoogleAdsListAccountsOperator(
task_id="list_accounts", bucket=BUCKET_NAME, object_name=GCS_ACCOUNTS_CSV
task_id="list_accounts",
bucket=BUCKET_NAME,
object_name=GCS_ACCOUNTS_CSV,
api_version=API_VERSION,
gcp_conn_id=CONNECTION_GLOUD_ID,
)
# [END howto_ads_list_accounts_operator]

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
task_id="delete_bucket",
bucket_name=BUCKET_NAME,
gcp_conn_id=CONNECTION_GLOUD_ID,
trigger_rule=TriggerRule.ALL_DONE,
)

@task(task_id="delete_connection_gloud")
def delete_connection_gloud(connection_id: str) -> None:
delete_airflow_connection(connection_id=connection_id)

delete_connection_gloud_task = delete_connection_gloud(connection_id=CONNECTION_GLOUD_ID)

@task(task_id="delete_connection_ads")
def delete_connection_ads(connection_id: str) -> None:
delete_airflow_connection(connection_id=connection_id)

delete_connection_ads_task = delete_connection_ads(connection_id=CONNECTION_ADS_ID)

(
# TEST SETUP
create_bucket
[
get_google_ads_client_id_task,
get_google_ads_service_account_key_task,
get_google_ads_developer_token_task,
]
>> create_connection_gcloud_for_ads # type: ignore
>> create_connection_ads_task
>> create_bucket
# TEST BODY
>> run_operator
>> list_accounts
# TEST TEARDOWN
>> delete_bucket
>> [delete_connection_gloud_task, delete_connection_ads_task]
)

from tests_common.test_utils.watcher import watcher
Expand Down