diff --git a/flask/contentapi/youtube/comment_threads.py b/flask/contentapi/youtube/comment_threads.py index 536730f..433e94f 100644 --- a/flask/contentapi/youtube/comment_threads.py +++ b/flask/contentapi/youtube/comment_threads.py @@ -1,5 +1,6 @@ import googleapiclient.discovery import googleapiclient.errors +import asyncio from enum import Enum from .base import BaseApi @@ -30,16 +31,17 @@ class CommentThreads(BaseApi): def __init__(self, api_key: str): super().__init__(api_key=api_key) - def list(self, - part: set[ParamPart], - max_results: int = None, - order: ParamOrder = None, - page_token: str = None, - search_terms: str = None, - text_format: ParamTextFormat = None, - id: set[str] = None, - video_id: str = None - ): + async def list( + self, + part: set[ParamPart], + max_results: int = None, + order: ParamOrder = None, + page_token: str = None, + search_terms: str = None, + text_format: ParamTextFormat = None, + id: set[str] = None, + video_id: str = None + ): ''' Returns a list of comment threads that match the API request parameters. ''' @@ -99,5 +101,7 @@ def list(self, self.api_service_name, self.api_version, developerKey=self.api_key) request = youtube.commentThreads().list(**params) - response = request.execute() + loop = asyncio.get_event_loop() + future = loop.run_in_executor(None, request.execute) + response = await future return response diff --git a/flask/contentapi/youtube/comments.py b/flask/contentapi/youtube/comments.py index e92cfdd..605ec60 100644 --- a/flask/contentapi/youtube/comments.py +++ b/flask/contentapi/youtube/comments.py @@ -1,9 +1,12 @@ import googleapiclient.discovery import googleapiclient.errors +import asyncio from enum import Enum +from glom import glom from .base import BaseApi from contentapi.helpers import normalize_params +from models.youtube import YoutubeComment class ParamPart(Enum): @@ -24,14 +27,15 @@ class Comments(BaseApi): def __init__(self, api_key: str): super().__init__(api_key=api_key) - def list(self, - part: set[ParamPart], - max_results: int = None, - page_token: str = None, - text_format: ParamTextFormat = None, - id: set[str] = None, - parent_id: str = None - ): + async def list( + self, + part: set[ParamPart], + max_results: int = None, + page_token: str = None, + text_format: ParamTextFormat = None, + id: set[str] = None, + parent_id: str = None + ): ''' Returns a list of comments that match the API request parameters. ''' @@ -82,5 +86,23 @@ def list(self, self.api_service_name, self.api_version, developerKey=self.api_key) request = youtube.comments().list(**params) - response = request.execute() + loop = asyncio.get_event_loop() + future = loop.run_in_executor(None, request.execute) + response = await future return response + + +def parse_api_comment(api_comment: dict) -> YoutubeComment: + if not api_comment: + api_comment = {} + yc = YoutubeComment( + comment_id=api_comment.get('id', ''), + video_id=glom(api_comment, 'snippet.videoId', default=''), + text_original=glom(api_comment, 'snippet.textOriginal', default=''), + parent_id=glom(api_comment, 'snippet.parentId', default=''), + author_channel_id=glom(api_comment, 'snippet.authorChannelId.value', default=''), + like_count=glom(api_comment, 'snippet.likeCount', default=0), + published_at=glom(api_comment, 'snippet.publishedAt', default=''), + updated_at=glom(api_comment, 'snippet.updatedAt', default=''), + ) + return yc diff --git a/flask/contentapi/youtube/youtube_helpers.py b/flask/contentapi/youtube/youtube_helpers.py index ad21201..72f9184 100644 --- a/flask/contentapi/youtube/youtube_helpers.py +++ b/flask/contentapi/youtube/youtube_helpers.py @@ -1,7 +1,7 @@ -from typing import Callable, Iterator +from typing import Callable -def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator: +async def api_pages_iterator(api_func: Callable, *args, **kwargs): ''' Iterate the api pages. Each page contains 'items' and may contain 'nextPageToken'. @@ -19,13 +19,14 @@ def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator: err = None response = None try: - response = api_func(*args, **api_kwargs) + response = await api_func(*args, **api_kwargs) except Exception as e: err = e - yield response, err + response = response or {} + items = response.get('items', []) + yield items, err if err or not response: break - items = response.get('items') if isinstance(items, list): items_count += len(items) next_page_token = response.get('nextPageToken') diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index c8df6dc..df4075d 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -1,30 +1,102 @@ +import asyncio from glom import glom -from contentapi.youtube import comment_threads +from contentapi.youtube import comment_threads, comments from contentapi.youtube.youtube_helpers import api_pages_iterator +from .db import insert_many_comments +WORKERS_NUM = 13 -def run_all(video_id: str, api_key: str, max_items: int = None): - return keep_all_video_comment_thread_ids(video_id, api_key, max_items) +async def run_all(video_id: str, api_key: str, max_items: int = None): + if not max_items: + max_items = 999_999_999 + shared_data = {'comments_count': 0} + top_level_comment_queue = asyncio.Queue(maxsize=max_items or 0) -def keep_all_video_comment_thread_ids(video_id: str, api_key: str, max_items: int = None): - all_comment_thread_ids = [] + # Start replies workers, they wait to get ids from the top_level_comment_queue, + # fetch replies from the api and save content to db + replies_workers = [asyncio.create_task(replies_worker( + top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)] + + # Read top level comments, save content to db and fill the top_level_comment_queue + await get_all_top_level_comments( + top_level_comment_queue, video_id, api_key, shared_data, max_items) + await asyncio.gather(*replies_workers, return_exceptions=True) + + +async def get_all_top_level_comments( + top_level_comment_queue: asyncio.Queue, + video_id: str, + api_key: str, + shared_data: dict, + max_items: int +): comment_threads_api = comment_threads.CommentThreads(api_key) pages_iterator = api_pages_iterator( comment_threads_api.list, max_results=max_items, - part={comment_threads.ParamPart.ID}, + part={comment_threads.ParamPart.ID, comment_threads.ParamPart.SNIPPET}, + order=comment_threads.ParamOrder.RELEVANCE, video_id=video_id, ) - for page, err in pages_iterator: + async for items, err in pages_iterator: if err: - # TODO: handle error + break # TODO: handle error + top_level_comments = [comments.parse_api_comment( + glom(item, 'snippet.topLevelComment')) for item in items] + if not top_level_comments: + continue + if shared_data['comments_count'] >= max_items: break - page_ids: list[str] = glom(page, ('items', ['id']), default=[]) - if not page_ids: - # TODO: log warning about empty page - pass - else: - all_comment_thread_ids.extend(page_ids) - return all_comment_thread_ids + comments_left: int = max_items - shared_data['comments_count'] + top_level_comments_to_send = top_level_comments[:comments_left] + shared_data['comments_count'] += len(top_level_comments_to_send) + + # Save in db + insert_many_comments(top_level_comments_to_send) + + # Send to queue + for tl_comment in top_level_comments_to_send: + await top_level_comment_queue.put(tl_comment.comment_id) + + +async def get_all_comment_replies( + comment_id: str, + api_key: str, + shared_data: dict, + max_items: int +): + comments_api = comments.Comments(api_key) + pages_iterator = api_pages_iterator( + comments_api.list, + max_results=max_items, + part={comments.ParamPart.ID, comments.ParamPart.SNIPPET}, + parent_id=comment_id, + ) + async for items, err in pages_iterator: + if err: + break # TODO: handle error + replies = [comments.parse_api_comment(item) for item in items] + if not replies: + continue + if shared_data['comments_count'] >= max_items: + break + comments_left: int = max_items - shared_data['comments_count'] + replies_to_send = replies[:comments_left] + shared_data['comments_count'] += len(replies_to_send) + + # Save in db + insert_many_comments(replies_to_send) + + +async def replies_worker( + top_level_comment_queue: asyncio.Queue, + api_key: str, + shared_data: dict, + max_items: int +): + while shared_data['comments_count'] < max_items: + comment_id = await top_level_comment_queue.get() + await get_all_comment_replies(comment_id, api_key, shared_data, max_items) + top_level_comment_queue.task_done() diff --git a/flask/flaskr/db.py b/flask/flaskr/db.py new file mode 100644 index 0000000..af49853 --- /dev/null +++ b/flask/flaskr/db.py @@ -0,0 +1,2 @@ +def insert_many_comments(many): + pass diff --git a/flask/flaskr/views.py b/flask/flaskr/views.py index e3c0e00..3aa00bb 100644 --- a/flask/flaskr/views.py +++ b/flask/flaskr/views.py @@ -1,20 +1,18 @@ import os +import time from flaskr import app from flask import request from .bl import run_all @app.route('/youtube/videos//comments', methods=['PUT']) -def scan_youtube_video_comments(id: str): +async def scan_youtube_video_comments(id: str): max_items = request.args.get('max', None, int) API_KEY = os.environ.get('YOUTUBE_API_KEY', '') - re = run_all(video_id=id, api_key=API_KEY, max_items=max_items) - return {'res': re}, 202 - - # TODO: Change this - re = comments_api.list( - part={comments.ParamPart.ID, comments.ParamPart.SNIPPET}, - id={id}, - max_results=10 - ) - return re, 202 + start_time = time.perf_counter() + try: + await run_all(video_id=id, api_key=API_KEY, max_items=max_items) + elapsed = time.perf_counter() - start_time + return {"message": "Accepted", "time": elapsed}, 202 + except Exception: + return {"message": "Failed"}, 500 diff --git a/flask/models/__init__.py b/flask/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flask/models/youtube.py b/flask/models/youtube.py new file mode 100644 index 0000000..acb7c32 --- /dev/null +++ b/flask/models/youtube.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass + + +@dataclass +class YoutubeComment: + comment_id: str + video_id: str + text_original: str + parent_id: str + author_channel_id: str + like_count: int + published_at: str + updated_at: str diff --git a/flask/requirements.txt b/flask/requirements.txt index e7f339d..7bcd89d 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -1,4 +1,5 @@ Flask==2.0.1 +asgiref==3.4.1 google-api-python-client==2.15.0 google-auth-oauthlib==0.4.5 google-auth-httplib2==0.1.0