-
Notifications
You must be signed in to change notification settings - Fork 9
Add KKTIX attendee Info ETL to BigQuery pycontw-225217.dwd.kktix_ticket_xxxx_attendees tables #123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
"ods_kktix_attendeeId_datetime" table and load to the legecy tables: : ods_kktix_ticket_(corporate, individual, reserved)_attendees
…ames in better way
|
||
## KKTIX BigQuery Transform | ||
1. Background: Start from 2022, we extract the KKTIX data via KKTIX API and load to "pycontw-225217.ods.ods_kktix_attendeeId_datetime". However most of the data are store in the ATTENDEE_INFO column with json format. To use metabase with SQL, users need to extract the data by json_extract with the knowledge kktix format instead of flat database. And we also need to rewrite all the SQLs build for current databases. | ||
2. Solution: Transform the tables in backend that we could keep the same user experience by using Metabase. | ||
3. Run: | ||
- for 3 tables in single bash script: `./kktix_bq_etl.sh 2023` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx for documenting these up 🙏
@@ -28,6 +30,7 @@ def load(event_raw_data_array: List): | |||
sanitized_event_raw_data = _sanitize_payload(event_raw_data) | |||
payload.append(sanitized_event_raw_data) | |||
_load_to_bigquery(payload) | |||
_load_to_bigquery_dwd(payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
# print(sanitized_df.columns) | ||
# print(sanitized_df.head()) | ||
# df_null = sanitized_df.isnull() | ||
# print(sanitized_df.iloc[:, :5]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove these comments?
def load_to_df_from_list( | ||
results, source="dag", update_after_ts=0 | ||
) -> Tuple[DataFrame, DataFrame]: | ||
# Use DataFrame for table transform operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to me that you should rename this function to transform_xxx()
?
JOB_CONFIG = bigquery.LoadJobConfig(schema=SCHEMA) | ||
|
||
|
||
def _load_row_df_from_dict(json_dict, update_after_ts) -> DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
CANONICAL_COLUMN_NAMES_2020_EXTRA_CORPORATE = { | ||
"invoice_policy", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused constants?
#!/bin/bash | ||
# | ||
# export GOOGLE_APPLICATION_CREDENTIALS="<where to access service-account.json>" | ||
# | ||
project_id="pycontw-225217" | ||
cmd=${PWD}/../dags/ods/kktix_ticket_orders/udfs/kktix_bq_dwd_etl.py | ||
|
||
|
||
for ticket_type in corporate individual reserved | ||
do | ||
suffix=${ticket_type}_attendees$2 | ||
cmd_args="-p ${project_id} -d dwd -t kktix_ticket_${suffix} -k ${ticket_type} -y $1 --upload" | ||
echo ${cmd_args} | ||
${cmd} ${cmd_args} | ||
done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -43,6 +46,30 @@ def _load_to_bigquery(payload: List[Dict]) -> None: | |||
job.result() | |||
|
|||
|
|||
def _load_to_bigquery_dwd(payload: List[Dict]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the nit picking, but seems to me that this function is more like transform
than load
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, it looks good to me. I just have some suggestions regarding the design aspect.
So feel free to ship it, and we can discuss the design pattern afterwards
About the Design Pattern
Hera are my 2 cents:
- Follow
ETL
orELT
paradigm (for example: https://github.com/pycontw/pycon-etl/blob/master/dags/ods/kktix_ticket_orders/udfs/kktix_api.py#L30-L34) - Follow single responsibility principle: Since
_load_to_bigquery_dwd()
is actually doing transformation, (pycon-etl/dags/ods/kktix_ticket_orders/udfs/kktix_loader.py
Lines 20 to 33 in cd68ec1
def load(event_raw_data_array: List): """ load data into bigquery! """ # data quality check if len(event_raw_data_array) == 0: print("Nothing to load, skip!") return payload = [] for event_raw_data in event_raw_data_array: sanitized_event_raw_data = _sanitize_payload(event_raw_data) payload.append(sanitized_event_raw_data) _load_to_bigquery(payload) _load_to_bigquery_dwd(payload) single responsibility principle
. I would suggest using an independent operator for thisdwd
operation. - Make the pipeline idempotent if possible. Since you're using
WriteDisposition.WRITE_APPEND
right now, it means we might have duplicate data if someone re-runs a partially failed job.
Types of changes
Description
Background: Start from 2022, we extract the KKTIX data via KKTIX API and load to "pycontw-225217.ods.ods_kktix_attendeeId_datetime". However most of the data are store in the ATTENDEE_INFO column with json format. To use metabase with SQL, users need to extract the data by json_extract with the knowledge kktix format instead of flat database. And we also need to rewrite all the SQLs build for current databases.
Solution: Transform the tables in backend that we could keep the same user experience by using Metabase.
Checklist:
poetry run pytest
locally to ensure all linter checks passSteps to Test This Pull Request
cd contrib
./kktix_bq_etl.sh 2023
Expected behavior
The data had been load to dwd.kktix_ticket_${ticket_type}_attendees_test on bigquery
ticket_type = corporate individual reserved