Skip to content

Add KKTIX attendee Info ETL to BigQuery pycontw-225217.dwd.kktix_ticket_xxxx_attendees tables #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 12, 2023

Conversation

GeekAngus
Copy link
Collaborator

Types of changes

  • New feature

Description

Background: Start from 2022, we extract the KKTIX data via KKTIX API and load to "pycontw-225217.ods.ods_kktix_attendeeId_datetime". However most of the data are store in the ATTENDEE_INFO column with json format. To use metabase with SQL, users need to extract the data by json_extract with the knowledge kktix format instead of flat database. And we also need to rewrite all the SQLs build for current databases.
Solution: Transform the tables in backend that we could keep the same user experience by using Metabase.

Checklist:

  • Add test cases to all the changes you introduce
  • Run poetry run pytest locally to ensure all linter checks pass
  • Update the documentation if necessary

Steps to Test This Pull Request

cd contrib
./kktix_bq_etl.sh 2023

Expected behavior

The data had been load to dwd.kktix_ticket_${ticket_type}_attendees_test on bigquery
ticket_type = corporate individual reserved

Comment on lines +20 to +26

## KKTIX BigQuery Transform
1. Background: Start from 2022, we extract the KKTIX data via KKTIX API and load to "pycontw-225217.ods.ods_kktix_attendeeId_datetime". However most of the data are store in the ATTENDEE_INFO column with json format. To use metabase with SQL, users need to extract the data by json_extract with the knowledge kktix format instead of flat database. And we also need to rewrite all the SQLs build for current databases.
2. Solution: Transform the tables in backend that we could keep the same user experience by using Metabase.
3. Run:
- for 3 tables in single bash script: `./kktix_bq_etl.sh 2023`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for documenting these up 🙏

@@ -28,6 +30,7 @@ def load(event_raw_data_array: List):
sanitized_event_raw_data = _sanitize_payload(event_raw_data)
payload.append(sanitized_event_raw_data)
_load_to_bigquery(payload)
_load_to_bigquery_dwd(payload)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +643 to +646
# print(sanitized_df.columns)
# print(sanitized_df.head())
# df_null = sanitized_df.isnull()
# print(sanitized_df.iloc[:, :5])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove these comments?

Comment on lines +490 to +493
def load_to_df_from_list(
results, source="dag", update_after_ts=0
) -> Tuple[DataFrame, DataFrame]:
# Use DataFrame for table transform operations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me that you should rename this function to transform_xxx()?

JOB_CONFIG = bigquery.LoadJobConfig(schema=SCHEMA)


def _load_row_df_from_dict(json_dict, update_after_ts) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +55 to +56
CANONICAL_COLUMN_NAMES_2020_EXTRA_CORPORATE = {
"invoice_policy",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused constants?

Comment on lines +1 to +15
#!/bin/bash
#
# export GOOGLE_APPLICATION_CREDENTIALS="<where to access service-account.json>"
#
project_id="pycontw-225217"
cmd=${PWD}/../dags/ods/kktix_ticket_orders/udfs/kktix_bq_dwd_etl.py


for ticket_type in corporate individual reserved
do
suffix=${ticket_type}_attendees$2
cmd_args="-p ${project_id} -d dwd -t kktix_ticket_${suffix} -k ${ticket_type} -y $1 --upload"
echo ${cmd_args}
${cmd} ${cmd_args}
done
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -43,6 +46,30 @@ def _load_to_bigquery(payload: List[Dict]) -> None:
job.result()


def _load_to_bigquery_dwd(payload: List[Dict]) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the nit picking, but seems to me that this function is more like transform than load

Copy link
Collaborator

@david30907d david30907d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, it looks good to me. I just have some suggestions regarding the design aspect.

So feel free to ship it, and we can discuss the design pattern afterwards

About the Design Pattern

Hera are my 2 cents:

  1. Follow ETL or ELT paradigm (for example: https://github.com/pycontw/pycon-etl/blob/master/dags/ods/kktix_ticket_orders/udfs/kktix_api.py#L30-L34)
  2. Follow single responsibility principle: Since _load_to_bigquery_dwd() is actually doing transformation, (
    def load(event_raw_data_array: List):
    """
    load data into bigquery!
    """
    # data quality check
    if len(event_raw_data_array) == 0:
    print("Nothing to load, skip!")
    return
    payload = []
    for event_raw_data in event_raw_data_array:
    sanitized_event_raw_data = _sanitize_payload(event_raw_data)
    payload.append(sanitized_event_raw_data)
    _load_to_bigquery(payload)
    _load_to_bigquery_dwd(payload)
    ) is no longer following single responsibility principle. I would suggest using an independent operator for this dwd operation.
  3. Make the pipeline idempotent if possible. Since you're using WriteDisposition.WRITE_APPEND right now, it means we might have duplicate data if someone re-runs a partially failed job.

@GeekAngus GeekAngus merged commit dc751a5 into master Aug 12, 2023
@GeekAngus GeekAngus deleted the kktix_etl_3tables branch August 12, 2023 16:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants