Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import google spreadsheet into bigquery #5

Merged
merged 17 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dags/google_spreadsheet_import_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import logging
from datetime import timedelta
from airflow import DAG

from data_pipeline.utils.dags.data_pipeline_dag_utils import (
get_default_args,
create_python_task,
)
from data_pipeline.spreadsheet_data.google_spreadsheet_config import (
MultiCsvSheet
)
from data_pipeline.spreadsheet_data.google_spreadsheet_etl import (
etl_google_spreadsheet
)

LOGGER = logging.getLogger(__name__)

DEPLOYMENT_ENV_ENV_NAME = "DEPLOYMENT_ENV"
DEFAULT_DEPLOYMENT_ENV_VALUE = "ci"

DAG_ID = "Google_Spreadsheet_Data_Pipeline"
G_SPREADSHEET_DAG = DAG(
dag_id=DAG_ID,
default_args=get_default_args(),
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
)


def get_env_var_or_use_default(env_var_name, default_value):
return os.getenv(env_var_name, default_value)


def google_spreadsheet_data_etl(**kwargs):
data_config_dict = kwargs["dag_run"].conf
dep_env = get_env_var_or_use_default(
DEPLOYMENT_ENV_ENV_NAME, DEFAULT_DEPLOYMENT_ENV_VALUE
)
data_config = MultiCsvSheet(data_config_dict, dep_env)
etl_google_spreadsheet(data_config)


G_SPREADSHEET_ETL_TASK = create_python_task(
G_SPREADSHEET_DAG, "google_spreadsheet_data_etl",
google_spreadsheet_data_etl,
)
57 changes: 57 additions & 0 deletions dags/google_spreadsheet_pipeline_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os

from airflow import DAG

from data_pipeline.spreadsheet_data.google_spreadsheet_config import (
MultiSpreadsheetConfig,
)
from data_pipeline.spreadsheet_data.google_spreadsheet_etl import (
get_yaml_file_as_dict
)
from data_pipeline.utils.dags.data_pipeline_dag_utils import (
get_default_args,
simple_trigger_dag,
create_python_task
)

GOOGLE_SPREADSHEET_SCHEDULE_INTERVAL_ENV_NAME = (
"GOOGLE_SPREADSHEET_SCHEDULE_INTERVAL"
)
SPREADSHEET_CONFIG_FILE_PATH_ENV_NAME = (
"SPREADSHEET_CONFIG_FILE_PATH"
)

DEPLOYMENT_ENV_ENV_NAME = "DEPLOYMENT_ENV"
DEFAULT_DEPLOYMENT_ENV_VALUE = "ci"

TARGET_DAG = "Google_Spreadsheet_Data_Pipeline"


def get_env_var_or_use_default(env_var_name, default_value=None):
return os.getenv(env_var_name, default_value)


# pylint: disable=unused-argument
def trigger_dag(**kwargs):
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
conf_file_path = get_env_var_or_use_default(
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
SPREADSHEET_CONFIG_FILE_PATH_ENV_NAME, ""
)
data_config_dict = get_yaml_file_as_dict(conf_file_path)

data_config = MultiSpreadsheetConfig(data_config_dict,)
for _, spreadsheet_config in data_config.spreadsheets_config.items():
simple_trigger_dag(dag_id=TARGET_DAG, conf=spreadsheet_config)


SPREADSHEET_CONTROLLER_DAG = DAG(
dag_id="Google_Spreadsheet_Import_Pipeline_Controller",
default_args=get_default_args(),
schedule_interval=get_env_var_or_use_default(
GOOGLE_SPREADSHEET_SCHEDULE_INTERVAL_ENV_NAME
),
)

TRIGGER_SPEADSHEET_ETL_DAG_TASK = create_python_task(
SPREADSHEET_CONTROLLER_DAG, "trigger_google_spreadsheet_etl_dag",
trigger_dag,
)
Empty file.
92 changes: 92 additions & 0 deletions data_pipeline/spreadsheet_data/google_spreadsheet_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
class MultiSpreadsheetConfig:
def __init__(self,
multi_spreadsheet_config: dict,
):
self.gcp_project = multi_spreadsheet_config.get("gcpProjectName")
self.import_timestamp_field_name = multi_spreadsheet_config.get(
"importedTimestampFieldName"
)
self.spreadsheets_config = {
spreadsheet.get("spreadsheetId"): extend_spreadsheet_config_dict(
spreadsheet,
self.gcp_project,
self.import_timestamp_field_name,
)
for spreadsheet in multi_spreadsheet_config.get("spreadsheets")
}


def extend_spreadsheet_config_dict(
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
spreadsheet_config_dict,
gcp_project: str,
imported_timestamp_field_name: str,
):
spreadsheet_config_dict["gcpProjectName"] = gcp_project
spreadsheet_config_dict[
"importedTimestampFieldName"
] = imported_timestamp_field_name

return spreadsheet_config_dict


class MultiCsvSheet:
def __init__(self, multi_sheet_config: dict,
deployment_env: str,
):
self.spreadsheet_id = multi_sheet_config.get("spreadsheetId")
self.import_timestamp_field_name = multi_sheet_config.get(
"importedTimestampFieldName"
)
self.gcp_project = multi_sheet_config.get("gcpProjectName")
self.sheets_config = {
sheet.get("sheetName"): CsvSheetConfig(
sheet,
self.spreadsheet_id,
self.gcp_project,
self.import_timestamp_field_name,
deployment_env,
)
for sheet in multi_sheet_config.get("sheets")
}


# pylint: disable=too-many-instance-attributes,too-many-arguments,
# pylint: disable=simplifiable-if-expression
class CsvSheetConfig:
def __init__(
self,
csv_sheet_config: dict,
spreadsheet_id: str,
gcp_project: str,
imported_timestamp_field_name: str,
deployment_env: str,
environment_placeholder: str = "{ENV}"
):
self.gcp_project = gcp_project
self.import_timestamp_field_name = imported_timestamp_field_name
self.spreadsheet_id = spreadsheet_id
self.header_line_index = csv_sheet_config.get("headerLineIndex")
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
self.data_values_start_line_index = csv_sheet_config.get(
"dataValuesStartLineIndex"
)
self.sheet_name = csv_sheet_config.get("sheetName")
self.sheet_range = csv_sheet_config.get("sheetRange")
self.table_name = csv_sheet_config.get("tableName")
self.dataset_name = csv_sheet_config.get(
"datasetName"
).replace(environment_placeholder, deployment_env)
self.table_write_append = (
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
True
if csv_sheet_config.get("tableWriteAppend", "").lower() == "true"
else False
)
self.metadata = {
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
record.get("metadataSchemaFieldName"):
record.get("metadataLineIndex")
for record in csv_sheet_config.get("metadata", [])
}
self.fixed_sheet_metadata = {
tayowonibi marked this conversation as resolved.
Show resolved Hide resolved
record.get("metadataSchemaFieldName"):
record.get("fixedSheetValue")
for record in csv_sheet_config.get("fixedSheetMetadata", [])
}
Loading