Skip to content

Commit

Permalink
Get comment replies concurrently using asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
NoamNol committed Aug 8, 2021
1 parent 9808ca6 commit 125c48c
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 51 deletions.
26 changes: 15 additions & 11 deletions flask/contentapi/youtube/comment_threads.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import googleapiclient.discovery
import googleapiclient.errors
import asyncio
from enum import Enum

from .base import BaseApi
Expand Down Expand Up @@ -30,16 +31,17 @@ class CommentThreads(BaseApi):
def __init__(self, api_key: str):
super().__init__(api_key=api_key)

def list(self,
part: set[ParamPart],
max_results: int = None,
order: ParamOrder = None,
page_token: str = None,
search_terms: str = None,
text_format: ParamTextFormat = None,
id: set[str] = None,
video_id: str = None
):
async def list(
self,
part: set[ParamPart],
max_results: int = None,
order: ParamOrder = None,
page_token: str = None,
search_terms: str = None,
text_format: ParamTextFormat = None,
id: set[str] = None,
video_id: str = None
):
'''
Returns a list of comment threads that match the API request parameters.
'''
Expand Down Expand Up @@ -99,5 +101,7 @@ def list(self,
self.api_service_name, self.api_version, developerKey=self.api_key)

request = youtube.commentThreads().list(**params)
response = request.execute()
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, request.execute)
response = await future
return response
40 changes: 31 additions & 9 deletions flask/contentapi/youtube/comments.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import googleapiclient.discovery
import googleapiclient.errors
import asyncio
from enum import Enum
from glom import glom

from .base import BaseApi
from contentapi.helpers import normalize_params
from models.youtube import YoutubeComment


class ParamPart(Enum):
Expand All @@ -24,14 +27,15 @@ class Comments(BaseApi):
def __init__(self, api_key: str):
super().__init__(api_key=api_key)

def list(self,
part: set[ParamPart],
max_results: int = None,
page_token: str = None,
text_format: ParamTextFormat = None,
id: set[str] = None,
parent_id: str = None
):
async def list(
self,
part: set[ParamPart],
max_results: int = None,
page_token: str = None,
text_format: ParamTextFormat = None,
id: set[str] = None,
parent_id: str = None
):
'''
Returns a list of comments that match the API request parameters.
'''
Expand Down Expand Up @@ -82,5 +86,23 @@ def list(self,
self.api_service_name, self.api_version, developerKey=self.api_key)

request = youtube.comments().list(**params)
response = request.execute()
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, request.execute)
response = await future
return response


def parse_api_comment(api_comment: dict) -> YoutubeComment:
if not api_comment:
api_comment = {}
yc = YoutubeComment(
comment_id=api_comment.get('id', ''),
video_id=glom(api_comment, 'snippet.videoId', default=''),
text_original=glom(api_comment, 'snippet.textOriginal', default=''),
parent_id=glom(api_comment, 'snippet.parentId', default=''),
author_channel_id=glom(api_comment, 'snippet.authorChannelId.value', default=''),
like_count=glom(api_comment, 'snippet.likeCount', default=0),
published_at=glom(api_comment, 'snippet.publishedAt', default=''),
updated_at=glom(api_comment, 'snippet.updatedAt', default=''),
)
return yc
11 changes: 6 additions & 5 deletions flask/contentapi/youtube/youtube_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable, Iterator
from typing import Callable


def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator:
async def api_pages_iterator(api_func: Callable, *args, **kwargs):
'''
Iterate the api pages.
Each page contains 'items' and may contain 'nextPageToken'.
Expand All @@ -19,13 +19,14 @@ def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator:
err = None
response = None
try:
response = api_func(*args, **api_kwargs)
response = await api_func(*args, **api_kwargs)
except Exception as e:
err = e
yield response, err
response = response or {}
items = response.get('items', [])
yield items, err
if err or not response:
break
items = response.get('items')
if isinstance(items, list):
items_count += len(items)
next_page_token = response.get('nextPageToken')
Expand Down
102 changes: 87 additions & 15 deletions flask/flaskr/bl.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,102 @@
import asyncio
from glom import glom

from contentapi.youtube import comment_threads
from contentapi.youtube import comment_threads, comments
from contentapi.youtube.youtube_helpers import api_pages_iterator
from .db import insert_many_comments

WORKERS_NUM = 13

def run_all(video_id: str, api_key: str, max_items: int = None):
return keep_all_video_comment_thread_ids(video_id, api_key, max_items)

async def run_all(video_id: str, api_key: str, max_items: int = None):
if not max_items:
max_items = 999_999_999
shared_data = {'comments_count': 0}
top_level_comment_queue = asyncio.Queue(maxsize=max_items or 0)

def keep_all_video_comment_thread_ids(video_id: str, api_key: str, max_items: int = None):
all_comment_thread_ids = []
# Start replies workers, they wait to get ids from the top_level_comment_queue,
# fetch replies from the api and save content to db
replies_workers = [asyncio.create_task(replies_worker(
top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)]

# Read top level comments, save content to db and fill the top_level_comment_queue
await get_all_top_level_comments(
top_level_comment_queue, video_id, api_key, shared_data, max_items)
await asyncio.gather(*replies_workers, return_exceptions=True)


async def get_all_top_level_comments(
top_level_comment_queue: asyncio.Queue,
video_id: str,
api_key: str,
shared_data: dict,
max_items: int
):
comment_threads_api = comment_threads.CommentThreads(api_key)
pages_iterator = api_pages_iterator(
comment_threads_api.list,
max_results=max_items,
part={comment_threads.ParamPart.ID},
part={comment_threads.ParamPart.ID, comment_threads.ParamPart.SNIPPET},
order=comment_threads.ParamOrder.RELEVANCE,
video_id=video_id,
)
for page, err in pages_iterator:
async for items, err in pages_iterator:
if err:
# TODO: handle error
break # TODO: handle error
top_level_comments = [comments.parse_api_comment(
glom(item, 'snippet.topLevelComment')) for item in items]
if not top_level_comments:
continue
if shared_data['comments_count'] >= max_items:
break
page_ids: list[str] = glom(page, ('items', ['id']), default=[])
if not page_ids:
# TODO: log warning about empty page
pass
else:
all_comment_thread_ids.extend(page_ids)
return all_comment_thread_ids
comments_left: int = max_items - shared_data['comments_count']
top_level_comments_to_send = top_level_comments[:comments_left]
shared_data['comments_count'] += len(top_level_comments_to_send)

# Save in db
insert_many_comments(top_level_comments_to_send)

# Send to queue
for tl_comment in top_level_comments_to_send:
await top_level_comment_queue.put(tl_comment.comment_id)


async def get_all_comment_replies(
comment_id: str,
api_key: str,
shared_data: dict,
max_items: int
):
comments_api = comments.Comments(api_key)
pages_iterator = api_pages_iterator(
comments_api.list,
max_results=max_items,
part={comments.ParamPart.ID, comments.ParamPart.SNIPPET},
parent_id=comment_id,
)
async for items, err in pages_iterator:
if err:
break # TODO: handle error
replies = [comments.parse_api_comment(item) for item in items]
if not replies:
continue
if shared_data['comments_count'] >= max_items:
break
comments_left: int = max_items - shared_data['comments_count']
replies_to_send = replies[:comments_left]
shared_data['comments_count'] += len(replies_to_send)

# Save in db
insert_many_comments(replies_to_send)


async def replies_worker(
top_level_comment_queue: asyncio.Queue,
api_key: str,
shared_data: dict,
max_items: int
):
while shared_data['comments_count'] < max_items:
comment_id = await top_level_comment_queue.get()
await get_all_comment_replies(comment_id, api_key, shared_data, max_items)
top_level_comment_queue.task_done()
2 changes: 2 additions & 0 deletions flask/flaskr/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def insert_many_comments(many):
pass
20 changes: 9 additions & 11 deletions flask/flaskr/views.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import os
import time
from flaskr import app
from flask import request
from .bl import run_all


@app.route('/youtube/videos/<id>/comments', methods=['PUT'])
def scan_youtube_video_comments(id: str):
async def scan_youtube_video_comments(id: str):
max_items = request.args.get('max', None, int)
API_KEY = os.environ.get('YOUTUBE_API_KEY', '')
re = run_all(video_id=id, api_key=API_KEY, max_items=max_items)
return {'res': re}, 202

# TODO: Change this
re = comments_api.list(
part={comments.ParamPart.ID, comments.ParamPart.SNIPPET},
id={id},
max_results=10
)
return re, 202
start_time = time.perf_counter()
try:
await run_all(video_id=id, api_key=API_KEY, max_items=max_items)
elapsed = time.perf_counter() - start_time
return {"message": "Accepted", "time": elapsed}, 202
except Exception:
return {"message": "Failed"}, 500
Empty file added flask/models/__init__.py
Empty file.
13 changes: 13 additions & 0 deletions flask/models/youtube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass


@dataclass
class YoutubeComment:
comment_id: str
video_id: str
text_original: str
parent_id: str
author_channel_id: str
like_count: int
published_at: str
updated_at: str
1 change: 1 addition & 0 deletions flask/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Flask==2.0.1
asgiref==3.4.1
google-api-python-client==2.15.0
google-auth-oauthlib==0.4.5
google-auth-httplib2==0.1.0
Expand Down

0 comments on commit 125c48c

Please sign in to comment.