Skip to content

Feature/formatting #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ x-airflow-common:
&airflow-common-env
# uncomment to allow using user IAM for access
#AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://user:password@host/schema?extra__google_cloud_platform__scope=${GCP_AUTH_SCOPE}&extra__google_cloud_platform__project=${GCP_PROJECT_ID}'
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow1!@postgres/airflow
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
env_file:
- ./variables/docker-env-vars
- ./variables/docker-env-secrets # added to gitignore
Expand All @@ -34,7 +34,7 @@ services:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow1!
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
Expand Down Expand Up @@ -94,8 +94,8 @@ services:
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow1!}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:${AIRFLOW_GID:-0}"

volumes:
postgres-db-volume:
postgres-db-volume:
7 changes: 5 additions & 2 deletions gcp_airflow_foundations/common/gcp/load_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
ParseSchema,
)
from gcp_airflow_foundations.source_class.schema_source_config import SchemaSourceConfig
from gcp_airflow_foundations.operators.gcp.create_dataset import CustomBigQueryCreateEmptyDatasetOperator
from gcp_airflow_foundations.operators.gcp.create_dataset import (
CustomBigQueryCreateEmptyDatasetOperator,
)


def load_builder(
Expand Down Expand Up @@ -137,7 +139,7 @@ def load_builder(
dataset_id=data_source.dlp_config.results_dataset_id,
location=location,
exists_ok=True,
dag=dag
dag=dag,
)

dlp_tasks_configs = [
Expand All @@ -161,6 +163,7 @@ def load_builder(
ods_dlp_task_groups = schedule_dlp_to_datacatalog_taskgroup_multiple_tables(
table_configs=dlp_tasks_configs,
table_dlp_config=dlp_table_config,
location=location,
next_task=done,
dag=dag,
)
Expand Down
9 changes: 4 additions & 5 deletions gcp_airflow_foundations/common/gcp/source_schema/bq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def read_schema_from_bq(
Expand All @@ -18,6 +15,8 @@ def read_schema_from_bq(

bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None)

schema = bq_hook.get_schema(dataset_id=dataset_id, table_id=table_id, project_id=project_id)
schema = bq_hook.get_schema(
dataset_id=dataset_id, table_id=table_id, project_id=project_id
)

return schema["fields"]
3 changes: 0 additions & 3 deletions gcp_airflow_foundations/common/gcp/source_schema/gcs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook

from urllib.parse import urlparse
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


from airflow.models.baseoperator import BaseOperator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import csv
import tempfile
import warnings
import time
from typing import Any, Dict, List, Optional, Sequence, Union
from random import shuffle
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta

import pyarrow.parquet as pq
import pyarrow

from airflow.exceptions import AirflowException

from gcp_airflow_foundations.operators.facebook.hooks.ads import (
CustomFacebookAdsReportingHook,
)
from gcp_airflow_foundations.enums.facebook import AccountLookupScope, ApiObject

from airflow.models import BaseOperator, Variable
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

from google.cloud import bigquery

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pandas as pd
import json
import pyarrow.parquet as pq
import pyarrow

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.utils.decorators import apply_defaults

from google.cloud import bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def schedule_dlp_to_datacatalog_taskgroup(
project_id: str,
table_id: str,
dataset_id: str,
location: str,
table_dlp_config: DlpTableConfig,
next_task: BaseOperator,
dag,
Expand All @@ -52,6 +53,7 @@ def schedule_dlp_to_datacatalog_taskgroup(
project_id=project_id,
table_id=table_id,
dataset_id=dataset_id,
location=location,
table_dlp_config=table_dlp_config,
next_task=next_task,
dag=dag,
Expand All @@ -72,7 +74,11 @@ def schedule_dlp_to_datacatalog_taskgroup(


def schedule_dlp_to_datacatalog_taskgroup_multiple_tables(
table_configs: list, table_dlp_config: DlpTableConfig, next_task: BaseOperator, dag
table_configs: list,
table_dlp_config: DlpTableConfig,
location: str,
next_task: BaseOperator,
dag,
):
"""
Check if DLP should run, and run it on multiple tables
Expand All @@ -88,6 +94,7 @@ def schedule_dlp_to_datacatalog_taskgroup_multiple_tables(
for table_config in table_configs:
dlp_task = dlp_to_datacatalog_builder(
taskgroup=taskgroup,
location=location,
datastore=table_config["datastore"],
project_id=table_config["project_id"],
table_id=table_config["table_id"],
Expand Down Expand Up @@ -125,6 +132,7 @@ def dlp_to_datacatalog_builder(
project_id: str,
table_id: str,
dataset_id: str,
location: str,
table_dlp_config: DlpTableConfig,
next_task: BaseOperator,
dag,
Expand Down Expand Up @@ -181,6 +189,7 @@ def dlp_to_datacatalog_builder(
project_id=dlp_results_table_ref.project,
dataset_id=dlp_results_table_ref.dataset_id,
table_id=dlp_results_table_ref.table_id,
location=location,
do_xcom_push=True,
min_match_count=table_dlp_config.get_min_match_count(),
task_group=taskgroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class DlpBQInspectionResultsOperator(BaseOperator):
:type dataset_id: str
:param min_match_count: Minimum number of findings per column/likelihood level pair
:type min_match_count: int
:param location: Location of the source dataset
:type dataset_id: str
"""

@apply_defaults
Expand All @@ -27,6 +29,7 @@ def __init__(
dataset_id,
table_id,
project_id,
location,
min_match_count=0,
do_xcom_push=True,
gcp_conn_id="google_cloud_default",
Expand All @@ -44,6 +47,7 @@ def __init__(
gcp_conn_id=gcp_conn_id,
use_legacy_sql=False,
impersonation_chain=impersonation_chain,
location=location,
)
conn = self.hook.get_conn()
self.cursor = conn.cursor()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from typing import Optional
from datetime import datetime

from airflow.models import BaseOperator, BaseOperatorLink
from airflow.contrib.operators.bigquery_operator import (
BigQueryOperator,
BigQueryCreateEmptyTableOperator,
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook

from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -171,9 +169,7 @@ def pre_execute(self, context) -> None:

self.write_disposition = "WRITE_TRUNCATE"
self.create_disposition = "CREATE_IF_NEEDED"
self.destination_dataset_table = (
f"{self.project_id}.{self.data_dataset_name}.{self.data_table_name}${partition_id}"
)
self.destination_dataset_table = f"{self.project_id}.{self.data_dataset_name}.{self.data_table_name}${partition_id}"

elif self.hds_table_config.hds_table_type == HdsTableType.SCD2:
sql = sql_helper.create_scd2_sql_with_hash(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from typing import Optional
from datetime import datetime

from airflow.models import BaseOperator, BaseOperatorLink
from airflow.contrib.operators.bigquery_operator import (
BigQueryOperator,
BigQueryCreateEmptyTableOperator,
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook

from airflow.exceptions import AirflowException

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
from typing import Optional

from airflow.models import BaseOperator, BaseOperatorLink
from airflow.contrib.operators.bigquery_operator import (
BigQueryOperator,
BigQueryCreateEmptyTableOperator,
)

from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook

from airflow.exceptions import AirflowException

import logging

from gcp_airflow_foundations.common.gcp.source_schema.gcs import read_schema_from_gcs
from gcp_airflow_foundations.common.gcp.ods.schema_utils import parse_ods_schema
from gcp_airflow_foundations.common.gcp.hds.schema_utils import parse_hds_schema


from gcp_airflow_foundations.base_class.ods_table_config import OdsTableConfig
from gcp_airflow_foundations.base_class.hds_table_config import HdsTableConfig


class ParseSchema(BaseOperator):
@apply_defaults
def __init__(
Expand All @@ -45,8 +31,8 @@ def __init__(
self.new_column_udfs = new_column_udfs
self.data_source = data_source
self.table_config = table_config
self.ods_table_id = ods_table_id,
self.hds_table_id = hds_table_id,
self.ods_table_id = (ods_table_id,)
self.hds_table_id = (hds_table_id,)
self.ods_table_config = table_config.ods_config
self.hds_table_config = table_config.hds_config

Expand Down Expand Up @@ -80,7 +66,9 @@ def execute(self, context):
column_names = list(self.new_column_udfs.keys())
for column_name in column_names:
field = self.new_column_udfs[column_name]
source_schema_fields.append({"name": column_name, "type": field["output_type"]})
source_schema_fields.append(
{"name": column_name, "type": field["output_type"]}
)

if self.ods_table_config:
schema_xcom[
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/dlp/test_dlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from airflow.utils.types import DagRunType
from bq_test_utils import insert_to_bq_from_dict
from datetime import datetime
from google.cloud import bigquery
from pytest_testconfig import config
from test_utils import cleanup_xcom, clear_db_dags

Expand Down Expand Up @@ -148,6 +147,7 @@ def create_dlp_dag(self, dag, project_id, dataset_id, target_table_id):
table_id=target_table_id,
dataset_id=dataset_id,
table_dlp_config=dlp_table_config,
location=config["gcp"]["location"],
next_task=done,
dag=dag,
)
Expand Down
4 changes: 1 addition & 3 deletions tests/test_utils/bq_test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from google.cloud import bigquery
from google.cloud.bigquery import SchemaField
import pandas
from time import sleep
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def insert_to_bq_from_csv(csv, project_id, dataset_id, table_id):
Expand Down
1 change: 1 addition & 0 deletions tests/unit/operators/dlp/test_dlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def test_dlp_args(
table_id=TEST_TABLE_ID,
dataset_id=TEST_DATASET,
table_dlp_config=dlp_table_config,
location="us",
dag=self.dag,
)

Expand Down