Skip to content

Commit

Permalink
Refactor and fix dead producer async block
Browse files Browse the repository at this point in the history
  • Loading branch information
NoamNol committed Aug 11, 2021
1 parent 125c48c commit 7ec6c20
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 37 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ASYNC_WORKERS=13
4 changes: 2 additions & 2 deletions flask/.env.example → flask/.env.dev
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FLASK_ENV=development

FLASK_APP=flaskr

YOUTUBE_API_KEY=A...z

ASYNC_WORKERS=13
2 changes: 1 addition & 1 deletion flask/.flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

[flake8]
ignore = E402
ignore = E402, W504
exclude = .git,__pycache__,docs/conf.py,old,build,dist,env
max-line-length = 100
4 changes: 2 additions & 2 deletions flask/contentapi/youtube/comments.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ def parse_api_comment(api_comment: dict) -> YoutubeComment:
comment_id=api_comment.get('id', ''),
video_id=glom(api_comment, 'snippet.videoId', default=''),
text_original=glom(api_comment, 'snippet.textOriginal', default=''),
parent_id=glom(api_comment, 'snippet.parentId', default=''),
parent_id=glom(api_comment, 'snippet.parentId', default=None),
author_channel_id=glom(api_comment, 'snippet.authorChannelId.value', default=''),
like_count=glom(api_comment, 'snippet.likeCount', default=0),
published_at=glom(api_comment, 'snippet.publishedAt', default=''),
updated_at=glom(api_comment, 'snippet.updatedAt', default=''),
updated_at=glom(api_comment, 'snippet.updatedAt', default='')
)
return yc
85 changes: 59 additions & 26 deletions flask/flaskr/bl.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
import asyncio
import os
import sys
from glom import glom

from contentapi.youtube import comment_threads, comments
from contentapi.youtube.youtube_helpers import api_pages_iterator
from .db import insert_many_comments
from models.youtube import YoutubeComment
from . import db

WORKERS_NUM = 13


async def run_all(video_id: str, api_key: str, max_items: int = None):
async def scan_youtube_video_comments(video_id: str, api_key: str, max_items: int = None):
WORKERS_NUM = int(os.environ.get('ASYNC_WORKERS', 10))
if not max_items:
max_items = 999_999_999
shared_data = {'comments_count': 0}
shared_data = {'comments_count': 0, 'replies_count': 0, 'producer_alive': True}
top_level_comment_queue = asyncio.Queue(maxsize=max_items or 0)

# Start replies workers, they wait to get ids from the top_level_comment_queue,
# fetch replies from the api and save content to db
# fetch replies from the api and save to db
replies_workers = [asyncio.create_task(replies_worker(
top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)]

# Read top level comments, save content to db and fill the top_level_comment_queue
# Read top level comments, save to db and fill the top_level_comment_queue
await get_all_top_level_comments(
top_level_comment_queue, video_id, api_key, shared_data, max_items)
shared_data['producer_alive'] = False

await asyncio.gather(*replies_workers, return_exceptions=True)
return shared_data['comments_count'], shared_data['replies_count']


async def get_all_top_level_comments(
Expand All @@ -36,33 +41,46 @@ async def get_all_top_level_comments(
pages_iterator = api_pages_iterator(
comment_threads_api.list,
max_results=max_items,
part={comment_threads.ParamPart.ID, comment_threads.ParamPart.SNIPPET},
part={
comment_threads.ParamPart.ID,
comment_threads.ParamPart.SNIPPET,
comment_threads.ParamPart.REPLIES},
order=comment_threads.ParamOrder.RELEVANCE,
video_id=video_id,
)
async for items, err in pages_iterator:
if err:
break # TODO: handle error
top_level_comments = [comments.parse_api_comment(
print(err, file=sys.stderr) # TODO: handle error
break
top_level_comments: list[YoutubeComment] = [comments.parse_api_comment(
glom(item, 'snippet.topLevelComment')) for item in items]

# Count how many replies each top_level_comment has
replies_counts = [len(glom(item, 'replies.comments', default=[])) for item in items]

if not top_level_comments:
continue
if shared_data['comments_count'] >= max_items:
break

# To save less comments than the number of comments left until max_items
comments_left: int = max_items - shared_data['comments_count']
top_level_comments_to_send = top_level_comments[:comments_left]
shared_data['comments_count'] += len(top_level_comments_to_send)
top_level_comments_to_save = top_level_comments[:comments_left]
shared_data['comments_count'] += len(top_level_comments_to_save)

# Save in db
insert_many_comments(top_level_comments_to_send)
db.insert_many_youtube_comments(top_level_comments_to_save)

# Send to queue
for tl_comment in top_level_comments_to_send:
await top_level_comment_queue.put(tl_comment.comment_id)
# YouTube API returns CommentThread with only part of the full replies list,
# so we can't save all replies at this point.
# Therefore, we send the top comments to the queue to handle replies reading later.
for tl_comment, replies_count in zip(top_level_comments_to_save, replies_counts):
if replies_count > 0:
await top_level_comment_queue.put((tl_comment.comment_id, tl_comment.video_id))


async def get_all_comment_replies(
comment_id: str,
video_id: str,
api_key: str,
shared_data: dict,
max_items: int
Expand All @@ -76,18 +94,26 @@ async def get_all_comment_replies(
)
async for items, err in pages_iterator:
if err:
break # TODO: handle error
replies = [comments.parse_api_comment(item) for item in items]
print(err, file=sys.stderr) # TODO: handle error
break
replies: list[YoutubeComment] = [comments.parse_api_comment(item) for item in items]

# Add video_id field (youtube api omits the videoId from the replies)
for r in replies:
r.video_id = video_id

if not replies:
continue
if shared_data['comments_count'] >= max_items:
break

# To save less comments than the number of comments left until max_items
comments_left: int = max_items - shared_data['comments_count']
replies_to_send = replies[:comments_left]
shared_data['comments_count'] += len(replies_to_send)
replies_to_save = replies[:comments_left]
shared_data['comments_count'] += len(replies_to_save)
shared_data['replies_count'] += len(replies_to_save)

# Save in db
insert_many_comments(replies_to_send)
db.insert_many_youtube_comments(replies_to_save)


async def replies_worker(
Expand All @@ -97,6 +123,13 @@ async def replies_worker(
max_items: int
):
while shared_data['comments_count'] < max_items:
comment_id = await top_level_comment_queue.get()
await get_all_comment_replies(comment_id, api_key, shared_data, max_items)
top_level_comment_queue.task_done()
try:
comment_id, video_id = top_level_comment_queue.get_nowait()
await get_all_comment_replies(comment_id, video_id, api_key, shared_data, max_items)
top_level_comment_queue.task_done()
except asyncio.QueueEmpty:
if shared_data['producer_alive']:
await asyncio.sleep(0.2)
continue
else:
break
2 changes: 1 addition & 1 deletion flask/flaskr/db.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
def insert_many_comments(many):
def insert_many_youtube_comments(many):
pass
14 changes: 10 additions & 4 deletions flask/flaskr/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
import time
from flaskr import app
from flask import request
from .bl import run_all
from . import bl


@app.route('/youtube/videos/<id>/comments', methods=['PUT'])
async def scan_youtube_video_comments(id: str):
max_items = request.args.get('max', None, int)
API_KEY = os.environ.get('YOUTUBE_API_KEY', '')
api_key = os.environ.get('YOUTUBE_API_KEY', '')
start_time = time.perf_counter()
try:
await run_all(video_id=id, api_key=API_KEY, max_items=max_items)
comments_count, replies_count = await bl.scan_youtube_video_comments(
video_id=id, api_key=api_key, max_items=max_items)
elapsed = time.perf_counter() - start_time
return {"message": "Accepted", "time": elapsed}, 202
return {
"message": "Accepted",
"time": elapsed,
'comments_count': comments_count,
'replies_count': replies_count,
}, 202
except Exception:
return {"message": "Failed"}, 500
3 changes: 2 additions & 1 deletion flask/models/youtube.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class YoutubeComment:
comment_id: str
video_id: str
text_original: str
parent_id: str
parent_id: Optional[str]
author_channel_id: str
like_count: int
published_at: str
Expand Down

0 comments on commit 7ec6c20

Please sign in to comment.