diff --git a/dags/ods/twitter_post_insights/dags.py b/dags/ods/twitter_post_insights/dags.py new file mode 100644 index 0000000..3ae1f3f --- /dev/null +++ b/dags/ods/twitter_post_insights/dags.py @@ -0,0 +1,36 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from ods.twitter_post_insights import udfs + +DEFAULT_ARGS = { + "owner": "Henry Lee", + "depends_on_past": False, + "start_date": datetime(2023, 6, 14, 0), + "retries": 2, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": lambda x: "Need to send notification to Discord!", +} +dag = DAG( + "TWITTER_POST_INSIGHTS_V1", + default_args=DEFAULT_ARGS, + schedule_interval="5 8 * * *", + max_active_runs=1, + catchup=False, +) +with dag: + CREATE_TABLE_IF_NEEDED = PythonOperator( + task_id="CREATE_TABLE_IF_NEEDED", python_callable=udfs.create_table_if_needed, + ) + + SAVE_TWITTER_POSTS_AND_INSIGHTS = PythonOperator( + task_id="SAVE_TWITTER_POSTS_AND_INSIGHTS", + python_callable=udfs.save_twitter_posts_and_insights, + ) + + CREATE_TABLE_IF_NEEDED >> SAVE_TWITTER_POSTS_AND_INSIGHTS + + +if __name__ == "__main__": + dag.cli() diff --git a/dags/ods/twitter_post_insights/udfs.py b/dags/ods/twitter_post_insights/udfs.py new file mode 100644 index 0000000..a68405b --- /dev/null +++ b/dags/ods/twitter_post_insights/udfs.py @@ -0,0 +1,170 @@ +import logging +import os +from datetime import datetime +from typing import List, Optional + +import requests +from airflow.models import Variable +from google.cloud import bigquery + +logger = logging.getLogger(__name__) + + +def create_table_if_needed() -> None: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + post_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_twitter_posts` ( + id STRING, + created_at TIMESTAMP, + message STRING + ) + """ + client.query(post_sql) + insights_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_twitter_posts_insights` ( + post_id STRING, + query_time TIMESTAMP, + period STRING, + favorite INTEGER, + reply INTEGER, + retweet INTEGER, + views INTEGER + ) + """ + client.query(insights_sql) + + +def save_twitter_posts_and_insights() -> None: + posts = request_posts_data() + + last_post = query_last_post() + if last_post is None: + new_posts = posts + else: + new_posts = [ + post + for post in posts + if post["timestamp"] > last_post["created_at"].timestamp() + ] + + if not dump_posts_to_bigquery( + [ + { + "id": post["tweet_id"], + "created_at": post["timestamp"], + "message": post["text"], + } + for post in new_posts + ] + ): + raise RuntimeError("Failed to dump posts to BigQuery") + + if not dump_posts_insights_to_bigquery( + [ + { + "post_id": post["tweet_id"], + "query_time": datetime.now().timestamp(), + "period": "lifetime", + "favorite": post["favorite_count"], + "reply": post["reply_count"], + "retweet": post["retweet_count"], + "views": post["views"], + } + for post in posts + ] + ): + raise RuntimeError("Failed to dump posts insights to BigQuery") + + +def query_last_post() -> Optional[dict]: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + sql = """ + SELECT + created_at + FROM + `pycontw-225217.ods.ods_pycontw_twitter_posts` + ORDER BY + created_at DESC + LIMIT 1 + """ + result = client.query(sql) + data = list(result) + return data[0] if data else None + + +def request_posts_data() -> List[dict]: + url = "https://twitter154.p.rapidapi.com/user/tweets" + # 499339900 is PyConTW's twitter id + querystring = { + "username": "pycontw", + "user_id": "499339900", + "limit": "40", + "include_replies": "false", + "include_pinned": "false", + } + headers = { + "X-RapidAPI-Key": Variable.get("RAPIDAPIAPI_KEY"), + "X-RapidAPI-Host": "twitter154.p.rapidapi.com", + } + response = requests.get(url, headers=headers, params=querystring) + if response.ok: + return response.json()["results"] + raise RuntimeError(f"Failed to fetch posts data: {response.text}") + + +def dump_posts_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No posts to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("message", "STRING", mode="REQUIRED"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_twitter_posts", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True) + return False + + +def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No post insights to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("period", "STRING", mode="REQUIRED"), + bigquery.SchemaField("favorite", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("reply", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("retweet", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_twitter_posts_insights", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts insights to BigQuery: {e}", exc_info=True) + return False