Skip to content

Commit

Permalink
Merge pull request #148 from pycontw/linkedin_insight
Browse files Browse the repository at this point in the history
[feat](linkedin dag): add LinkedIn insight dag into airflow
  • Loading branch information
henry410213028 authored Sep 1, 2024
2 parents 9167206 + f522b51 commit 5c69500
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 45 deletions.
70 changes: 25 additions & 45 deletions .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
@@ -1,65 +1,45 @@
name: Docker Image CI

on:
push:
branches: [ master, prod ]
pull_request:
branches: [ master, prod ]
env:
RC_NAME: davidtnfsh/pycon_etl

RC_NAME: asia-east1-docker.pkg.dev/${{ secrets.GCP_PROJECT_ID }}/data-team/pycon-etl
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Login to docker hub
uses: actions-hub/docker/login@master
env:
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }}
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}

- uses: actions/checkout@v4
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}
- name: Configure docker to use gcloud command-line tool as a credential helper
run: |
gcloud auth configure-docker asia-east1-docker.pkg.dev
- name: Pull cache
run: |
docker login -u ${{ secrets.DOCKER_USERNAME }} -p ${{ secrets.DOCKER_PASSWORD }}
docker pull ${RC_NAME}:cache
docker pull ${RC_NAME}:cache || true
- name: Build the Docker image
if: always()
run: |
docker build -t ${RC_NAME}:${GITHUB_SHA} --cache-from ${RC_NAME}:cache .
docker tag ${RC_NAME}:${GITHUB_SHA} ${RC_NAME}:cache
docker build -t ${RC_NAME}:cache --cache-from ${RC_NAME}:cache .
docker build -t ${RC_NAME}:test --cache-from ${RC_NAME}:cache -f Dockerfile.test .
docker tag ${RC_NAME}:${GITHUB_SHA} ${RC_NAME}:staging
docker tag ${RC_NAME}:${GITHUB_SHA} ${RC_NAME}:latest
- name: Run test
run: |
docker run -d --rm -p 8080:8080 --name airflow -v $(pwd)/dags:/usr/local/airflow/dags -v $(pwd)/fixtures:/usr/local/airflow/fixtures ${RC_NAME}:test webserver
docker run -d --rm -p 8080:8080 --name airflow -v $(pwd)/dags:/opt/airflow/dags -v $(pwd)/fixtures:/opt/airflow/fixtures ${RC_NAME}:test webserver
sleep 10
docker exec airflow bash -c "airflow test OPENING_CRAWLER_V1 CRAWLER 2020-01-01"
docker exec airflow bash -c "airflow test QUESTIONNAIRE_2_BIGQUERY TRANSFORM_data_questionnaire 2020-09-29"
- name: Push Cache to docker registry
uses: actions-hub/docker@master
if: always()
with:
args: push ${RC_NAME}:cache

- name: Push GITHUB_SHA to docker registry
uses: actions-hub/docker@master
if: always()
with:
args: push ${RC_NAME}:${GITHUB_SHA}

- name: Push staging to docker registry
uses: actions-hub/docker@master
if: ${{ github.ref == 'refs/heads/master' }} && success()
with:
args: push ${RC_NAME}:staging

- name: Push prod version to docker registry
uses: actions-hub/docker@master
- name: Push cache to Google Container Registry
if: success()
run: |
docker push ${RC_NAME}:cache
- name: Push staging to Google Container Registry
if: github.ref == 'refs/heads/master' && success()
run: |
docker tag ${RC_NAME}:cache ${RC_NAME}:staging
docker push ${RC_NAME}:staging
- name: Push prod version to Google Container Registry
if: github.ref == 'refs/heads/prod' && success()
with:
args: push ${RC_NAME}:latest
run: |
docker tag ${RC_NAME}:cache ${RC_NAME}:latest
docker push ${RC_NAME}:latest
36 changes: 36 additions & 0 deletions dags/ods/linkedin_post_insights/dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from ods.linkedin_post_insights import udfs

DEFAULT_ARGS = {
"owner": "Angus Yang",
"depends_on_past": False,
"start_date": datetime(2023, 6, 14, 0),
"retries": 2,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": lambda x: "Need to send notification to Discord!",
}
dag = DAG(
"LINKEDIN_POST_INSIGHTS_V1",
default_args=DEFAULT_ARGS,
schedule_interval="5 8 * * *",
max_active_runs=1,
catchup=False,
)
with dag:
CREATE_TABLE_IF_NEEDED = PythonOperator(
task_id="CREATE_TABLE_IF_NEEDED", python_callable=udfs.create_table_if_needed,
)

SAVE_TWITTER_POSTS_AND_INSIGHTS = PythonOperator(
task_id="SAVE_LINKEDIN_POSTS_AND_INSIGHTS",
python_callable=udfs.save_posts_and_insights,
)

CREATE_TABLE_IF_NEEDED >> SAVE_TWITTER_POSTS_AND_INSIGHTS


if __name__ == "__main__":
dag.cli()
226 changes: 226 additions & 0 deletions dags/ods/linkedin_post_insights/udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import logging
import os
from datetime import datetime
from typing import List, Optional

import requests
from airflow.models import Variable
from google.cloud import bigquery

logger = logging.getLogger(__name__)


def create_table_if_needed() -> None:
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
post_sql = """
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_linkedin_posts` (
id STRING,
created_at TIMESTAMP,
message STRING
)
"""
client.query(post_sql)
insights_sql = """
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_linkedin_posts_insights` (
post_id STRING,
query_time TIMESTAMP,
period STRING,
favorite INTEGER,
reply INTEGER,
retweet INTEGER,
views INTEGER
)
"""
client.query(insights_sql)

# Example output from the Rapid API, not all fields will exists for a specific post
#
# {
# "text": "For your kids in senior high.",
# "totalReactionCount": 6,
# "likeCount": 6,
# "repostsCount": 1,
# "empathyCount": 1,
# "commentsCount": 20,
# repostsCount:1,
# "postUrl": "https://www.linkedin.com/feed/update/urn:li:activity:6940542340960763905/",
# "postedAt": "1yr",
# "postedDate": "2022-06-09 05:57:23.126 +0000 UTC",
# "postedDateTimestamp": 1654754243126,
# "urn": "6940542340960763905",
# "author": {
# "firstName": "Angus",
# "lastName": "Yang",
# "username": "angus-yang-8885279a",
# "url": "https://www.linkedin.com/in/angus-yang-8885279a"
# },
# "company": {},
# "article": {
# "title": "2022 AWS STEM Summer Camp On The Cloud",
# "subtitle": "pages.awscloud.com • 2 min read",
# "link": "https://pages.awscloud.com/tw-2022-aws-stem-summer-camp-on-the-cloud_registration.html"
# }
# },


def save_posts_and_insights() -> None:
posts = request_posts_data()

last_post = query_last_post()
new_posts = (
[
post
for post in posts
if post["postedDateTimestamp"] > last_post["created_at"].timestamp()
]
if last_post
else posts
)

if not dump_posts_to_bigquery(
[
{
"id": post["urn"],
"created_at": post["postedDateTimestamp"],
"message": post["text"],
}
for post in new_posts
]
):
raise RuntimeError("Failed to dump posts to BigQuery")

if not dump_posts_insights_to_bigquery(
[
{
"post_id": post["urn"],
"query_time": datetime.now().timestamp(),
"period": "lifetime",
"favorite": post["likeCount"],
"reply": post["commentsCount"],
"retweet": post["repostsCount"],
"views": "0", # not support by RapidAPI
}
for post in posts
]
):
raise RuntimeError("Failed to dump posts insights to BigQuery")


def query_last_post() -> Optional[dict]:
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
sql = """
SELECT
created_at
FROM
`pycontw-225217.ods.ods_pycontw_linkedin_posts`
ORDER BY
created_at DESC
LIMIT 1
"""
result = client.query(sql)
data = list(result)
return data[0] if data else None


def request_posts_data() -> List[dict]:

# Define the request options
# url = 'https://linkedin-data-api.p.rapidapi.com/get-profile-posts' # for user
url = "https://linkedin-data-api.p.rapidapi.com/get-company-posts"
querystring = {"username": "pycontw"}
headers = {
"X-RapidAPI-Key": Variable.get("LINKEDIN_RAPIDAPI_KEY"),
"X-RapidAPI-Host": "linkedin-data-api.p.rapidapi.com",
}

response = requests.get(url, headers=headers, params=querystring, timeout=180)
if not response.ok:
raise RuntimeError(f"Failed to fetch posts data: {response.text}")

media_insight_list = []
media_res_list = response.json()["data"]
# format handling, the response may not include the required fields
for media_res in media_res_list:
media_insight = {}
media_insight["urn"] = media_res.get("urn", "0")
media_insight["postedDateTimestamp"] = (
media_res.get("postedDateTimestamp", "0") / 1000
)
media_insight["text"] = media_res.get("text", "No Content")
media_insight["likeCount"] = media_res.get("totalReactionCount", "0")
media_insight["commentsCount"] = media_res.get("commentsCount", "0")
media_insight["repostsCount"] = media_res.get("repostsCount", "0")
# logger.info(media_insight)
media_insight_list.append(media_insight)

return media_insight_list


def dump_posts_to_bigquery(posts: List[dict]) -> bool:
if not posts:
logger.info("No posts to dump!")
return True

client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("message", "STRING", mode="REQUIRED"),
],
write_disposition="WRITE_APPEND",
)
try:
job = client.load_table_from_json(
posts,
"pycontw-225217.ods.ods_pycontw_linkedin_posts",
job_config=job_config,
)
job.result()
return True
except Exception as e:
logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True)
return False


def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool:
if not posts:
logger.info("No post insights to dump!")
return True

client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("period", "STRING", mode="REQUIRED"),
bigquery.SchemaField("favorite", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("reply", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("retweet", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"),
],
write_disposition="WRITE_APPEND",
)
try:
job = client.load_table_from_json(
posts,
"pycontw-225217.ods.ods_pycontw_linkedin_posts_insights",
job_config=job_config,
)
job.result()
return True
except Exception as e:
logger.error(f"Failed to dump posts insights to BigQuery: {e}", exc_info=True)
return False


def test_main():
create_table_if_needed()

# request_posts_data()

save_posts_and_insights()


if __name__ == "__main__":
test_main()

0 comments on commit 5c69500

Please sign in to comment.