diff --git a/.env b/.env new file mode 100644 index 0000000..e2e0ff2 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +ASYNC_WORKERS=13 diff --git a/flask/.env.example b/flask/.env.dev similarity index 73% rename from flask/.env.example rename to flask/.env.dev index 2d55f53..405b177 100644 --- a/flask/.env.example +++ b/flask/.env.dev @@ -1,5 +1,5 @@ FLASK_ENV=development -FLASK_APP=flaskr - YOUTUBE_API_KEY=A...z + +ASYNC_WORKERS=13 diff --git a/flask/.flake8 b/flask/.flake8 index 76df9f7..f69503b 100644 --- a/flask/.flake8 +++ b/flask/.flake8 @@ -1,5 +1,5 @@ [flake8] -ignore = E402 +ignore = E402, W504 exclude = .git,__pycache__,docs/conf.py,old,build,dist,env max-line-length = 100 \ No newline at end of file diff --git a/flask/contentapi/youtube/comments.py b/flask/contentapi/youtube/comments.py index 605ec60..4d58dda 100644 --- a/flask/contentapi/youtube/comments.py +++ b/flask/contentapi/youtube/comments.py @@ -99,10 +99,10 @@ def parse_api_comment(api_comment: dict) -> YoutubeComment: comment_id=api_comment.get('id', ''), video_id=glom(api_comment, 'snippet.videoId', default=''), text_original=glom(api_comment, 'snippet.textOriginal', default=''), - parent_id=glom(api_comment, 'snippet.parentId', default=''), + parent_id=glom(api_comment, 'snippet.parentId', default=None), author_channel_id=glom(api_comment, 'snippet.authorChannelId.value', default=''), like_count=glom(api_comment, 'snippet.likeCount', default=0), published_at=glom(api_comment, 'snippet.publishedAt', default=''), - updated_at=glom(api_comment, 'snippet.updatedAt', default=''), + updated_at=glom(api_comment, 'snippet.updatedAt', default='') ) return yc diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index df4075d..1aa7300 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -1,28 +1,33 @@ import asyncio +import os +import sys from glom import glom from contentapi.youtube import comment_threads, comments from contentapi.youtube.youtube_helpers import api_pages_iterator -from .db import insert_many_comments +from models.youtube import YoutubeComment +from . import db -WORKERS_NUM = 13 - -async def run_all(video_id: str, api_key: str, max_items: int = None): +async def scan_youtube_video_comments(video_id: str, api_key: str, max_items: int = None): + WORKERS_NUM = int(os.environ.get('ASYNC_WORKERS', 10)) if not max_items: max_items = 999_999_999 - shared_data = {'comments_count': 0} + shared_data = {'comments_count': 0, 'replies_count': 0, 'producer_alive': True} top_level_comment_queue = asyncio.Queue(maxsize=max_items or 0) # Start replies workers, they wait to get ids from the top_level_comment_queue, - # fetch replies from the api and save content to db + # fetch replies from the api and save to db replies_workers = [asyncio.create_task(replies_worker( top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)] - # Read top level comments, save content to db and fill the top_level_comment_queue + # Read top level comments, save to db and fill the top_level_comment_queue await get_all_top_level_comments( top_level_comment_queue, video_id, api_key, shared_data, max_items) + shared_data['producer_alive'] = False + await asyncio.gather(*replies_workers, return_exceptions=True) + return shared_data['comments_count'], shared_data['replies_count'] async def get_all_top_level_comments( @@ -36,33 +41,46 @@ async def get_all_top_level_comments( pages_iterator = api_pages_iterator( comment_threads_api.list, max_results=max_items, - part={comment_threads.ParamPart.ID, comment_threads.ParamPart.SNIPPET}, + part={ + comment_threads.ParamPart.ID, + comment_threads.ParamPart.SNIPPET, + comment_threads.ParamPart.REPLIES}, order=comment_threads.ParamOrder.RELEVANCE, video_id=video_id, ) async for items, err in pages_iterator: if err: - break # TODO: handle error - top_level_comments = [comments.parse_api_comment( + print(err, file=sys.stderr) # TODO: handle error + break + top_level_comments: list[YoutubeComment] = [comments.parse_api_comment( glom(item, 'snippet.topLevelComment')) for item in items] + + # Count how many replies each top_level_comment has + replies_counts = [len(glom(item, 'replies.comments', default=[])) for item in items] + if not top_level_comments: continue if shared_data['comments_count'] >= max_items: break + + # To save less comments than the number of comments left until max_items comments_left: int = max_items - shared_data['comments_count'] - top_level_comments_to_send = top_level_comments[:comments_left] - shared_data['comments_count'] += len(top_level_comments_to_send) + top_level_comments_to_save = top_level_comments[:comments_left] + shared_data['comments_count'] += len(top_level_comments_to_save) - # Save in db - insert_many_comments(top_level_comments_to_send) + db.insert_many_youtube_comments(top_level_comments_to_save) - # Send to queue - for tl_comment in top_level_comments_to_send: - await top_level_comment_queue.put(tl_comment.comment_id) + # YouTube API returns CommentThread with only part of the full replies list, + # so we can't save all replies at this point. + # Therefore, we send the top comments to the queue to handle replies reading later. + for tl_comment, replies_count in zip(top_level_comments_to_save, replies_counts): + if replies_count > 0: + await top_level_comment_queue.put((tl_comment.comment_id, tl_comment.video_id)) async def get_all_comment_replies( comment_id: str, + video_id: str, api_key: str, shared_data: dict, max_items: int @@ -76,18 +94,26 @@ async def get_all_comment_replies( ) async for items, err in pages_iterator: if err: - break # TODO: handle error - replies = [comments.parse_api_comment(item) for item in items] + print(err, file=sys.stderr) # TODO: handle error + break + replies: list[YoutubeComment] = [comments.parse_api_comment(item) for item in items] + + # Add video_id field (youtube api omits the videoId from the replies) + for r in replies: + r.video_id = video_id + if not replies: continue if shared_data['comments_count'] >= max_items: break + + # To save less comments than the number of comments left until max_items comments_left: int = max_items - shared_data['comments_count'] - replies_to_send = replies[:comments_left] - shared_data['comments_count'] += len(replies_to_send) + replies_to_save = replies[:comments_left] + shared_data['comments_count'] += len(replies_to_save) + shared_data['replies_count'] += len(replies_to_save) - # Save in db - insert_many_comments(replies_to_send) + db.insert_many_youtube_comments(replies_to_save) async def replies_worker( @@ -97,6 +123,13 @@ async def replies_worker( max_items: int ): while shared_data['comments_count'] < max_items: - comment_id = await top_level_comment_queue.get() - await get_all_comment_replies(comment_id, api_key, shared_data, max_items) - top_level_comment_queue.task_done() + try: + comment_id, video_id = top_level_comment_queue.get_nowait() + await get_all_comment_replies(comment_id, video_id, api_key, shared_data, max_items) + top_level_comment_queue.task_done() + except asyncio.QueueEmpty: + if shared_data['producer_alive']: + await asyncio.sleep(0.2) + continue + else: + break diff --git a/flask/flaskr/db.py b/flask/flaskr/db.py index af49853..ee000b9 100644 --- a/flask/flaskr/db.py +++ b/flask/flaskr/db.py @@ -1,2 +1,2 @@ -def insert_many_comments(many): +def insert_many_youtube_comments(many): pass diff --git a/flask/flaskr/views.py b/flask/flaskr/views.py index 3aa00bb..b8272bc 100644 --- a/flask/flaskr/views.py +++ b/flask/flaskr/views.py @@ -2,17 +2,23 @@ import time from flaskr import app from flask import request -from .bl import run_all +from . import bl @app.route('/youtube/videos//comments', methods=['PUT']) async def scan_youtube_video_comments(id: str): max_items = request.args.get('max', None, int) - API_KEY = os.environ.get('YOUTUBE_API_KEY', '') + api_key = os.environ.get('YOUTUBE_API_KEY', '') start_time = time.perf_counter() try: - await run_all(video_id=id, api_key=API_KEY, max_items=max_items) + comments_count, replies_count = await bl.scan_youtube_video_comments( + video_id=id, api_key=api_key, max_items=max_items) elapsed = time.perf_counter() - start_time - return {"message": "Accepted", "time": elapsed}, 202 + return { + "message": "Accepted", + "time": elapsed, + 'comments_count': comments_count, + 'replies_count': replies_count, + }, 202 except Exception: return {"message": "Failed"}, 500 diff --git a/flask/models/youtube.py b/flask/models/youtube.py index acb7c32..de2a06a 100644 --- a/flask/models/youtube.py +++ b/flask/models/youtube.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Optional @dataclass @@ -6,7 +7,7 @@ class YoutubeComment: comment_id: str video_id: str text_original: str - parent_id: str + parent_id: Optional[str] author_channel_id: str like_count: int published_at: str