Skip to content

Commit

Permalink
Merge pull request #135 from pycontw/feature/twitter-post-metrics
Browse files Browse the repository at this point in the history
Feature/twitter post metrics
  • Loading branch information
henry410213028 authored May 25, 2024
2 parents ee0e333 + 12a49b6 commit 3c37063
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 0 deletions.
36 changes: 36 additions & 0 deletions dags/ods/twitter_post_insights/dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from ods.twitter_post_insights import udfs

DEFAULT_ARGS = {
"owner": "Henry Lee",
"depends_on_past": False,
"start_date": datetime(2023, 6, 14, 0),
"retries": 2,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": lambda x: "Need to send notification to Discord!",
}
dag = DAG(
"TWITTER_POST_INSIGHTS_V1",
default_args=DEFAULT_ARGS,
schedule_interval="5 8 * * *",
max_active_runs=1,
catchup=False,
)
with dag:
CREATE_TABLE_IF_NEEDED = PythonOperator(
task_id="CREATE_TABLE_IF_NEEDED", python_callable=udfs.create_table_if_needed,
)

SAVE_TWITTER_POSTS_AND_INSIGHTS = PythonOperator(
task_id="SAVE_TWITTER_POSTS_AND_INSIGHTS",
python_callable=udfs.save_twitter_posts_and_insights,
)

CREATE_TABLE_IF_NEEDED >> SAVE_TWITTER_POSTS_AND_INSIGHTS


if __name__ == "__main__":
dag.cli()
170 changes: 170 additions & 0 deletions dags/ods/twitter_post_insights/udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import logging
import os
from datetime import datetime
from typing import List, Optional

import requests
from airflow.models import Variable
from google.cloud import bigquery

logger = logging.getLogger(__name__)


def create_table_if_needed() -> None:
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
post_sql = """
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_twitter_posts` (
id STRING,
created_at TIMESTAMP,
message STRING
)
"""
client.query(post_sql)
insights_sql = """
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_twitter_posts_insights` (
post_id STRING,
query_time TIMESTAMP,
period STRING,
favorite INTEGER,
reply INTEGER,
retweet INTEGER,
views INTEGER
)
"""
client.query(insights_sql)


def save_twitter_posts_and_insights() -> None:
posts = request_posts_data()

last_post = query_last_post()
if last_post is None:
new_posts = posts
else:
new_posts = [
post
for post in posts
if post["timestamp"] > last_post["created_at"].timestamp()
]

if not dump_posts_to_bigquery(
[
{
"id": post["tweet_id"],
"created_at": post["timestamp"],
"message": post["text"],
}
for post in new_posts
]
):
raise RuntimeError("Failed to dump posts to BigQuery")

if not dump_posts_insights_to_bigquery(
[
{
"post_id": post["tweet_id"],
"query_time": datetime.now().timestamp(),
"period": "lifetime",
"favorite": post["favorite_count"],
"reply": post["reply_count"],
"retweet": post["retweet_count"],
"views": post["views"],
}
for post in posts
]
):
raise RuntimeError("Failed to dump posts insights to BigQuery")


def query_last_post() -> Optional[dict]:
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
sql = """
SELECT
created_at
FROM
`pycontw-225217.ods.ods_pycontw_twitter_posts`
ORDER BY
created_at DESC
LIMIT 1
"""
result = client.query(sql)
data = list(result)
return data[0] if data else None


def request_posts_data() -> List[dict]:
url = "https://twitter154.p.rapidapi.com/user/tweets"
# 499339900 is PyConTW's twitter id
querystring = {
"username": "pycontw",
"user_id": "499339900",
"limit": "40",
"include_replies": "false",
"include_pinned": "false",
}
headers = {
"X-RapidAPI-Key": Variable.get("RAPIDAPIAPI_KEY"),
"X-RapidAPI-Host": "twitter154.p.rapidapi.com",
}
response = requests.get(url, headers=headers, params=querystring)
if response.ok:
return response.json()["results"]
raise RuntimeError(f"Failed to fetch posts data: {response.text}")


def dump_posts_to_bigquery(posts: List[dict]) -> bool:
if not posts:
logger.info("No posts to dump!")
return True

client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("message", "STRING", mode="REQUIRED"),
],
write_disposition="WRITE_APPEND",
)
try:
job = client.load_table_from_json(
posts,
"pycontw-225217.ods.ods_pycontw_twitter_posts",
job_config=job_config,
)
job.result()
return True
except Exception as e:
logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True)
return False


def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool:
if not posts:
logger.info("No post insights to dump!")
return True

client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("period", "STRING", mode="REQUIRED"),
bigquery.SchemaField("favorite", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("reply", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("retweet", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"),
],
write_disposition="WRITE_APPEND",
)
try:
job = client.load_table_from_json(
posts,
"pycontw-225217.ods.ods_pycontw_twitter_posts_insights",
job_config=job_config,
)
job.result()
return True
except Exception as e:
logger.error(f"Failed to dump posts insights to BigQuery: {e}", exc_info=True)
return False

0 comments on commit 3c37063

Please sign in to comment.