Skip to content

upgrade version of apache-airflow-providers-google #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gcp_airflow_foundations/base_class/dlp_table_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DlpTableConfig:
rows_limit_percent_override: Optional[int] = None
min_match_count_override: Optional[int] = None
recurrence_override: Optional[str] = None
location: str = "US"

def set_source_config(self, source_config: DlpSourceConfig):
self.source_config = source_config
Expand Down
6 changes: 4 additions & 2 deletions gcp_airflow_foundations/common/gcp/source_schema/bq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def read_schema_from_bq(
Expand All @@ -18,6 +18,8 @@ def read_schema_from_bq(

bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None)

schema = bq_hook.get_schema(dataset_id=dataset_id, table_id=table_id, project_id=project_id)
schema = bq_hook.get_schema(
dataset_id=dataset_id, table_id=table_id, project_id=project_id
)

return schema["fields"]
2 changes: 1 addition & 1 deletion gcp_airflow_foundations/common/gcp/source_schema/gcs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook

from urllib.parse import urlparse
Expand Down
27 changes: 15 additions & 12 deletions gcp_airflow_foundations/operators/api/hooks/dataform_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import requests
import json


class DataformHook(BaseHook):
'''
"""
Airflow Hook to connect to Dataform's API and run Dataform jobs.
This hook will POST to Dataform's ApiService_RunCreate to initiate a new dataform run.
If no tags are provided, Dataform will run all.
Expand Down Expand Up @@ -38,35 +39,37 @@ class DataformHook(BaseHook):
- Helpful Medium tutorial: https://medium.com/google-cloud/cloud-composer-apache-airflow-dataform-bigquery-de6e3eaabeb3

** Note: Schedules must be on the master branch. In Dataform, you'll have to create a branch first and then merge changes into master.
'''
"""

def __init__(self, dataform_conn_id='datafrom_default') -> None:
def __init__(self, dataform_conn_id="datafrom_default") -> None:
self.conn = self.get_connection(dataform_conn_id)
self.api_key = self.conn.password
self.headers = {'Authorization': f'Bearer {self.api_key}'}
self.headers = {"Authorization": f"Bearer {self.api_key}"}

def run_job(self, project_id: str, environment: str, schedule: str, tags: Optional[str] = []) -> str:
def run_job(
self, project_id: str, environment: str, schedule: str, tags: Optional[str] = []
) -> str:

base_url = f'https://api.dataform.co/v1/project/{project_id}/run'
base_url = f"https://api.dataform.co/v1/project/{project_id}/run"
# headers = {'Authorization': f'Bearer {self.api_key}'}
run_create_request = {
"environmentName": environment,
"scheduleName": schedule,
"runConfig": {
"tags": tags
}
"runConfig": {"tags": tags},
}

# ApiService_RunCreate: Initiates new dataform runs within the project, and returns the ID of any created runs.
response = requests.post(base_url, data=json.dumps(run_create_request), headers=self.headers)
response = requests.post(
base_url, data=json.dumps(run_create_request), headers=self.headers
)

try:
# ApiService_RunGet: Returns information about a specific run
run_url = base_url + '/' + response.json()['id']
run_url = base_url + "/" + response.json()["id"]
return run_url
except Exception:
raise AirflowException(f"Dataform ApiService Error: {response.json()}")

def check_job_status(self, run_url: str) -> str:
response = requests.get(run_url, headers=self.headers)
return response.json()['status']
return response.json()["status"]
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class DataformOperator(BaseOperator):
'''
"""
This operator will use the custom DataformHook to POST to Dataform's ApiService_RunCreate to initiate a new dataform run.
If no tags are provided, Dataform will run all jobs.
After a run is created, if the validate_job_submitted parameter is set to True, it will poke Dataform for the run status for a certain amount of time. Use a separate sensor for longer jobs and set validate_job_submitted to False.
Expand Down Expand Up @@ -40,20 +40,21 @@ class DataformOperator(BaseOperator):
- Helpful Medium tutorial: https://medium.com/google-cloud/cloud-composer-apache-airflow-dataform-bigquery-de6e3eaabeb3

** Note: Schedules must be on the master branch. In Dataform, you'll have to create a branch first and then merge changes into master.
'''
"""

@apply_defaults
def __init__(
self,
*,
dataform_conn_id: str = 'dataform_default',
project_id: str,
environment: str,
schedule: str,
tags: Optional[str] = [],
submit_mode: str = 'submit_job_only',
retry_period_seconds: Optional[int] = 10,
**kwargs: Any) -> None:
self,
*,
dataform_conn_id: str = "dataform_default",
project_id: str,
environment: str,
schedule: str,
tags: Optional[str] = [],
submit_mode: str = "submit_job_only",
retry_period_seconds: Optional[int] = 10,
**kwargs: Any,
) -> None:

super().__init__(**kwargs)
self.dataform_conn_id = dataform_conn_id
Expand All @@ -70,28 +71,28 @@ def execute(self, context) -> str:
project_id=self.project_id,
environment=self.environment,
schedule=self.schedule,
tags=self.tags
tags=self.tags,
)

if self.submit_mode == 'submit_job_only':
if self.submit_mode == "submit_job_only":
return run_url

while self.submit_mode == 'validate_job_submitted':
while self.submit_mode == "validate_job_submitted":
response = dataform_hook.check_job_status(run_url)
logging.info(f'dataform job status: {response} run rul: {run_url}')
logging.info(f"dataform job status: {response} run rul: {run_url}")

if response in ('SUCCESSFUL', 'RUNNING'):
if response in ("SUCCESSFUL", "RUNNING"):
return run_url
else:
time.sleep(self.retry_period_seconds)

while self.submit_mode == 'wait_for_job_to_finish':
while self.submit_mode == "wait_for_job_to_finish":
response = dataform_hook.check_job_status(run_url)
logging.info(f'dataform job status: {response} run rul: {run_url}')
logging.info(f"dataform job status: {response} run rul: {run_url}")

if response == 'SUCCESSFUL':
if response == "SUCCESSFUL":
return run_url
elif response != 'RUNNING':
elif response != "RUNNING":
raise AirflowException(f"Dataform ApiService Error: {response}")
else:
time.sleep(self.retry_period_seconds)
time.sleep(self.retry_period_seconds)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


from airflow.models.baseoperator import BaseOperator
Expand Down
19 changes: 13 additions & 6 deletions gcp_airflow_foundations/operators/api/sensors/dataform_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@
from typing import Any
import logging


class DataformSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, task_ids: str, dataform_conn_id='dataform_conn_id', **kwargs: Any):
def __init__(
self, task_ids: str, dataform_conn_id="dataform_conn_id", **kwargs: Any
):
super().__init__(**kwargs)
self.task_ids = task_ids
self.hook = DataformHook(dataform_conn_id=dataform_conn_id)

def poke(self, context):
run_url = context['ti'].xcom_pull(key='return_value', task_ids=[self.task_ids])
run_url = context["ti"].xcom_pull(key="return_value", task_ids=[self.task_ids])
response = self.hook.check_job_status(run_url[0])

if response == 'SUCCESSFUL':
logging.info(f"Dataform run completed: SUCCESSFUL see run logs at {run_url}")
if response == "SUCCESSFUL":
logging.info(
f"Dataform run completed: SUCCESSFUL see run logs at {run_url}"
)
return True
elif response == 'RUNNING':
elif response == "RUNNING":
logging.info(f"Dataform job running. {run_url}")
return False
else:
raise AirflowException(f"Dataform run {response}: see run logs at {run_url}")
raise AirflowException(
f"Dataform run {response}: see run logs at {run_url}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from gcp_airflow_foundations.enums.facebook import AccountLookupScope, ApiObject

from airflow.models import BaseOperator, Variable
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

from google.cloud import bigquery

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pyarrow.parquet as pq
import pyarrow

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.utils.decorators import apply_defaults

from google.cloud import bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def dlp_to_datacatalog_builder(
project_id=dlp_results_table_ref.project,
dataset_id=dlp_results_table_ref.dataset_id,
table_id=dlp_results_table_ref.table_id,
location=table_dlp_config.location,
do_xcom_push=True,
min_match_count=table_dlp_config.get_min_match_count(),
task_group=taskgroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
dataset_id,
table_id,
project_id,
location,
min_match_count=0,
do_xcom_push=True,
gcp_conn_id="google_cloud_default",
Expand All @@ -44,6 +45,7 @@ def __init__(
gcp_conn_id=gcp_conn_id,
use_legacy_sql=False,
impersonation_chain=impersonation_chain,
location=location,
)
conn = self.hook.get_conn()
self.cursor = conn.cursor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -169,9 +169,7 @@ def pre_execute(self, context) -> None:

self.write_disposition = "WRITE_TRUNCATE"
self.create_disposition = "CREATE_IF_NEEDED"
self.destination_dataset_table = (
f"{self.project_id}.{self.data_dataset_name}.{self.data_table_name}${partition_id}"
)
self.destination_dataset_table = f"{self.project_id}.{self.data_dataset_name}.{self.data_table_name}${partition_id}"

elif self.hds_table_config.hds_table_type == HdsTableType.SCD2:
sql = sql_helper.create_scd2_sql_with_hash(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

from airflow.exceptions import AirflowException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -77,7 +77,9 @@ def execute(self, context):
column_names = list(self.new_column_udfs.keys())
for column_name in column_names:
field = self.new_column_udfs[column_name]
source_schema_fields.append({"name": column_name, "type": field["output_type"]})
source_schema_fields.append(
{"name": column_name, "type": field["output_type"]}
)

if self.ods_table_config:
schema_xcom[
Expand Down
2 changes: 1 addition & 1 deletion gcp_airflow_foundations/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.10"
__version__ = "0.3.10"
31 changes: 4 additions & 27 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,16 @@ pysftp>=0.2.9
sshtunnel>=0.1.4,<0.2

apache-airflow-providers-facebook==2.1.0
apache-airflow-providers-google==6.3.0
apache-airflow-providers-google==6.8.0
pyopenssl==20.0.1
google-ads==14.0.0
google-api-core[grpc,grpcgcp]==1.31.5

google-api-python-client==1.12.8
google-auth-httplib2==0.1.0
google-auth==1.35.0
google-cloud-automl==2.4.2
google-cloud-bigtable==1.7.0
google-cloud-bigquery==2.28.1
google-cloud-build==3.0.0
google-cloud-container==1.0.1
google-cloud-datacatalog==3.4.1
google-cloud-dataproc==3.1.0
google-cloud-dlp==1.0.0
google-cloud-kms==2.6.0
google-cloud-language==1.3.0
google-cloud-logging==2.6.0
google-cloud-memcache==1.0.0
google-cloud-monitoring==2.5.0
google-cloud-os-login==2.3.1
google-cloud-pubsub==2.8.0
google-cloud-redis==2.2.2
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.42.2
google-cloud-tasks==2.5.1
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==1.2.1


grpcio-gcp==0.2.2
httpx==0.19.0
json-merge-patch==0.2
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils/bq_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from google.cloud.bigquery import SchemaField
import pandas
from time import sleep
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def insert_to_bq_from_csv(csv, project_id, dataset_id, table_id):
Expand Down