From aa9bb71c211ea46f1601b58022d1cb946494c2e0 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Fri, 6 Aug 2021 12:29:07 +0300 Subject: [PATCH 01/13] Basic integration with YouTube api --- flask/.env.example | 5 + flask/.flake8 | 5 + .gitignore => flask/.gitignore | 0 flask/.vscode/settings.json | 5 + flask/app.py | 4 + flask/contentapi/__init__.py | 0 flask/contentapi/helpers.py | 36 +++++++ flask/contentapi/youtube/__init__.py | 0 flask/contentapi/youtube/base.py | 8 ++ flask/contentapi/youtube/comment_threads.py | 102 ++++++++++++++++++++ flask/contentapi/youtube/comments.py | 85 ++++++++++++++++ flask/flaskr/__init__.py | 5 + flask/flaskr/views.py | 17 ++++ flask/requirements-dev.txt | 3 + flask/requirements.txt | 5 + 15 files changed, 280 insertions(+) create mode 100644 flask/.env.example create mode 100644 flask/.flake8 rename .gitignore => flask/.gitignore (100%) create mode 100644 flask/.vscode/settings.json create mode 100644 flask/app.py create mode 100644 flask/contentapi/__init__.py create mode 100644 flask/contentapi/helpers.py create mode 100644 flask/contentapi/youtube/__init__.py create mode 100644 flask/contentapi/youtube/base.py create mode 100644 flask/contentapi/youtube/comment_threads.py create mode 100644 flask/contentapi/youtube/comments.py create mode 100644 flask/flaskr/__init__.py create mode 100644 flask/flaskr/views.py create mode 100644 flask/requirements-dev.txt create mode 100644 flask/requirements.txt diff --git a/flask/.env.example b/flask/.env.example new file mode 100644 index 0000000..2d55f53 --- /dev/null +++ b/flask/.env.example @@ -0,0 +1,5 @@ +FLASK_ENV=development + +FLASK_APP=flaskr + +YOUTUBE_API_KEY=A...z diff --git a/flask/.flake8 b/flask/.flake8 new file mode 100644 index 0000000..76df9f7 --- /dev/null +++ b/flask/.flake8 @@ -0,0 +1,5 @@ + +[flake8] +ignore = E402 +exclude = .git,__pycache__,docs/conf.py,old,build,dist,env +max-line-length = 100 \ No newline at end of file diff --git a/.gitignore b/flask/.gitignore similarity index 100% rename from .gitignore rename to flask/.gitignore diff --git a/flask/.vscode/settings.json b/flask/.vscode/settings.json new file mode 100644 index 0000000..66db099 --- /dev/null +++ b/flask/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "python.linting.pylintEnabled": true, + "python.linting.enabled": true, + "python.linting.flake8Enabled": true, +} \ No newline at end of file diff --git a/flask/app.py b/flask/app.py new file mode 100644 index 0000000..fee57ba --- /dev/null +++ b/flask/app.py @@ -0,0 +1,4 @@ +from dotenv import load_dotenv +load_dotenv() + +from flaskr import app # noqa diff --git a/flask/contentapi/__init__.py b/flask/contentapi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flask/contentapi/helpers.py b/flask/contentapi/helpers.py new file mode 100644 index 0000000..5b40378 --- /dev/null +++ b/flask/contentapi/helpers.py @@ -0,0 +1,36 @@ +from collections import Counter + + +def normalize_params(params_data: list[dict]) -> dict[str, str]: + ''' + 1. Validate parameters value + 2. Remove missing parameters or use default value + 3. Force no more than one parameter in group (For example, only one 'filter' parameter) + ''' + result = {} + groups_counter = Counter() + + for p in params_data: + name = p['name'] + value = p['value'] + to_string = p.get('toString', str) + group = p.get('group') + validator = p.get('validator') + + if value is None: + default = p.get('default') + if default is not None: + value = default + else: + continue + if group: + groups_counter[group] += 1 + if groups_counter[group] > 1: + raise ValueError(f"Must specify exactly one {group} parameter") + if validator: + message, is_valid = validator + if (not is_valid(value)): + raise ValueError(f"{name}:{value} is invalid parameter ({message})") + + result[name] = to_string(value) + return result diff --git a/flask/contentapi/youtube/__init__.py b/flask/contentapi/youtube/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flask/contentapi/youtube/base.py b/flask/contentapi/youtube/base.py new file mode 100644 index 0000000..0c1469f --- /dev/null +++ b/flask/contentapi/youtube/base.py @@ -0,0 +1,8 @@ +from abc import ABC + + +class BaseApi(ABC): + def __init__(self, api_key: str): + self.api_key = api_key + self.api_service_name = "youtube" + self.api_version = "v3" diff --git a/flask/contentapi/youtube/comment_threads.py b/flask/contentapi/youtube/comment_threads.py new file mode 100644 index 0000000..4469539 --- /dev/null +++ b/flask/contentapi/youtube/comment_threads.py @@ -0,0 +1,102 @@ +import googleapiclient.discovery +import googleapiclient.errors +from enum import Enum + +from .base import BaseApi +from contentapi.helpers import normalize_params + + +class ParamPart(Enum): + ID = 'id' + REPLIES = 'replies' + SNIPPET = 'snippet' + + +class ParamOrder(Enum): + TIME = 'time' + RELEVANCE = 'relevance' + + +class ParamTextFormat(Enum): + HTML = 'html' + PLAIN_TEXT = 'plainText' + + +class CommentThreads(BaseApi): + ''' + YouTube CommentThreads api. + See https://developers.google.com/youtube/v3/docs/commentThreads + ''' + def __init__(self, api_key: str): + super().__init__(api_key=api_key) + + def list(self, + part: set[ParamPart], + max_results: int = None, + order: ParamOrder = None, + page_token: str = None, + search_terms: str = None, + text_format: ParamTextFormat = None, + id: set[str] = None, + video_id: str = None + ): + ''' + Returns a list of comment threads that match the API request parameters. + ''' + params_data = [ + # Required parameters + { + 'name': 'part', + 'value': part, + 'toString': lambda v: ','.join([p.value for p in v]), + }, + # Filter parameters + { + 'name': 'id', + 'value': id, + 'toString': lambda v: ','.join(v), + 'group': 'filter', + }, + { + 'name': 'videoId', + 'value': video_id, + 'group': 'filter', + }, + # Optional parameters + { + 'name': 'maxResults', + 'value': max_results, + 'validator': ( + 'Acceptable values are 1 to 100', + lambda v: v >= 1 and v <= 100 + ) + }, + { + 'name': 'order', + 'value': order, + 'toString': lambda v: v.value, + }, + { + 'name': 'pageToken', + 'value': page_token, + }, + { + 'name': 'searchTerms', + 'value': search_terms, + }, + { + 'name': 'textFormat', + 'value': text_format, + 'toString': lambda v: v.value, + 'default': ParamTextFormat.PLAIN_TEXT, + }, + ] + + params = normalize_params(params_data) + + youtube = googleapiclient.discovery.build( + self.api_service_name, self.api_version, developerKey=self.api_key) + + request = youtube.commentthreads().list(**params) + response = request.execute() + return response diff --git a/flask/contentapi/youtube/comments.py b/flask/contentapi/youtube/comments.py new file mode 100644 index 0000000..982f1bf --- /dev/null +++ b/flask/contentapi/youtube/comments.py @@ -0,0 +1,85 @@ +import googleapiclient.discovery +import googleapiclient.errors +from enum import Enum + +from .base import BaseApi +from contentapi.helpers import normalize_params + + +class ParamPart(Enum): + ID = 'id' + SNIPPET = 'snippet' + + +class ParamTextFormat(Enum): + HTML = 'html' + PLAIN_TEXT = 'plainText' + + +class Comments(BaseApi): + ''' + YouTube Comments api. + See https://developers.google.com/youtube/v3/docs/comments + ''' + def __init__(self, api_key: str): + super().__init__(api_key=api_key) + + def list(self, + part: set[ParamPart], + max_results: int = None, + page_token: str = None, + text_format: ParamTextFormat = None, + id: set[str] = None, + parent_id: str = None + ): + ''' + Returns a list of comments that match the API request parameters. + ''' + params_data = [ + # Required parameters + { + 'name': 'part', + 'value': part, + 'toString': lambda v: ','.join([p.value for p in v]), + }, + # Filter parameters + { + 'name': 'id', + 'value': id, + 'toString': lambda v: ','.join(v), + 'group': 'filter', + }, + { + 'name': 'parentId', + 'value': parent_id, + 'group': 'filter', + }, + # Optional parameters + { + 'name': 'maxResults', + 'value': max_results, + 'validator': ( + 'Acceptable values are 1 to 100', + lambda v: v >= 1 and v <= 100 + ) + }, + { + 'name': 'pageToken', + 'value': page_token, + }, + { + 'name': 'textFormat', + 'value': text_format, + 'toString': lambda v: v.value, + 'default': ParamTextFormat.PLAIN_TEXT + }, + ] + + params = normalize_params(params_data) + + youtube = googleapiclient.discovery.build( + self.api_service_name, self.api_version, developerKey=self.api_key) + + request = youtube.comments().list(**params) + response = request.execute() + return response diff --git a/flask/flaskr/__init__.py b/flask/flaskr/__init__.py new file mode 100644 index 0000000..df80a50 --- /dev/null +++ b/flask/flaskr/__init__.py @@ -0,0 +1,5 @@ +from flask import Flask + +app = Flask(__name__) + +from flaskr import views # noqa diff --git a/flask/flaskr/views.py b/flask/flaskr/views.py new file mode 100644 index 0000000..0221862 --- /dev/null +++ b/flask/flaskr/views.py @@ -0,0 +1,17 @@ +import os +from flaskr import app +from contentapi.youtube import comments + + +@app.route('/youtube/videos//comments', methods=['PUT']) +def scan_youtube_video_comments(id: str): + API_KEY = os.environ.get('YOUTUBE_API_KEY', '') + comments_api = comments.Comments(API_KEY) + + # TODO: Change this + re = comments_api.list( + part={comments.ParamPart.ID, comments.ParamPart.SNIPPET}, + id={id}, + max_results=10 + ) + return re, 202 diff --git a/flask/requirements-dev.txt b/flask/requirements-dev.txt new file mode 100644 index 0000000..a32f854 --- /dev/null +++ b/flask/requirements-dev.txt @@ -0,0 +1,3 @@ +-r requirements.txt +flake8==3.9.2 +pylint==2.9.6 \ No newline at end of file diff --git a/flask/requirements.txt b/flask/requirements.txt new file mode 100644 index 0000000..d8726b2 --- /dev/null +++ b/flask/requirements.txt @@ -0,0 +1,5 @@ +Flask==2.0.1 +google-api-python-client==2.15.0 +google-auth-oauthlib==0.4.5 +google-auth-httplib2==0.1.0 +python-dotenv==0.19.0 \ No newline at end of file From 883fa6041e4a3b9b9d58e28fd7132cec43239861 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Fri, 6 Aug 2021 14:13:36 +0300 Subject: [PATCH 02/13] Fix normalize_params --- flask/contentapi/helpers.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flask/contentapi/helpers.py b/flask/contentapi/helpers.py index 5b40378..1af6e3e 100644 --- a/flask/contentapi/helpers.py +++ b/flask/contentapi/helpers.py @@ -3,9 +3,10 @@ def normalize_params(params_data: list[dict]) -> dict[str, str]: ''' - 1. Validate parameters value - 2. Remove missing parameters or use default value + 1. Remove missing parameters or use default value + 2. Validate parameters value 3. Force no more than one parameter in group (For example, only one 'filter' parameter) + 4. Convert value to string by simple str(value), or by custom toString function ''' result = {} groups_counter = Counter() @@ -13,7 +14,6 @@ def normalize_params(params_data: list[dict]) -> dict[str, str]: for p in params_data: name = p['name'] value = p['value'] - to_string = p.get('toString', str) group = p.get('group') validator = p.get('validator') @@ -32,5 +32,6 @@ def normalize_params(params_data: list[dict]) -> dict[str, str]: if (not is_valid(value)): raise ValueError(f"{name}:{value} is invalid parameter ({message})") + to_string = p.get('toString', str) result[name] = to_string(value) return result From bf4db3c7f4167005699cff67da5b805a61f46e2f Mon Sep 17 00:00:00 2001 From: NoamNol Date: Sun, 8 Aug 2021 04:47:03 +0300 Subject: [PATCH 03/13] Fix CommentThreads api --- flask/contentapi/youtube/comment_threads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flask/contentapi/youtube/comment_threads.py b/flask/contentapi/youtube/comment_threads.py index 4469539..76a6522 100644 --- a/flask/contentapi/youtube/comment_threads.py +++ b/flask/contentapi/youtube/comment_threads.py @@ -97,6 +97,6 @@ def list(self, youtube = googleapiclient.discovery.build( self.api_service_name, self.api_version, developerKey=self.api_key) - request = youtube.commentthreads().list(**params) + request = youtube.commentThreads().list(**params) response = request.execute() return response From b128181c5c4d50a5f7c182e2959e2691ed2389f4 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Sun, 8 Aug 2021 04:52:14 +0300 Subject: [PATCH 04/13] Api pagination --- flask/contentapi/helpers.py | 10 +++++--- flask/contentapi/youtube/comment_threads.py | 1 + flask/contentapi/youtube/comments.py | 1 + flask/contentapi/youtube/youtube_helpers.py | 28 +++++++++++++++++++++ flask/flaskr/bl.py | 28 +++++++++++++++++++++ flask/flaskr/views.py | 7 ++++-- flask/requirements.txt | 3 ++- 7 files changed, 72 insertions(+), 6 deletions(-) create mode 100644 flask/contentapi/youtube/youtube_helpers.py create mode 100644 flask/flaskr/bl.py diff --git a/flask/contentapi/helpers.py b/flask/contentapi/helpers.py index 1af6e3e..c175fd4 100644 --- a/flask/contentapi/helpers.py +++ b/flask/contentapi/helpers.py @@ -4,9 +4,10 @@ def normalize_params(params_data: list[dict]) -> dict[str, str]: ''' 1. Remove missing parameters or use default value - 2. Validate parameters value - 3. Force no more than one parameter in group (For example, only one 'filter' parameter) - 4. Convert value to string by simple str(value), or by custom toString function + 2. Force no more than one parameter in group (For example, only one 'filter' parameter) + 3. Fix value with a middleware function + 4. Validate parameters value + 5. Convert value to string with simple str(value), or with custom toString function ''' result = {} groups_counter = Counter() @@ -15,6 +16,7 @@ def normalize_params(params_data: list[dict]) -> dict[str, str]: name = p['name'] value = p['value'] group = p.get('group') + middleware = p.get('middleware') validator = p.get('validator') if value is None: @@ -27,6 +29,8 @@ def normalize_params(params_data: list[dict]) -> dict[str, str]: groups_counter[group] += 1 if groups_counter[group] > 1: raise ValueError(f"Must specify exactly one {group} parameter") + if middleware: + value = middleware(value) if validator: message, is_valid = validator if (not is_valid(value)): diff --git a/flask/contentapi/youtube/comment_threads.py b/flask/contentapi/youtube/comment_threads.py index 76a6522..536730f 100644 --- a/flask/contentapi/youtube/comment_threads.py +++ b/flask/contentapi/youtube/comment_threads.py @@ -66,6 +66,7 @@ def list(self, { 'name': 'maxResults', 'value': max_results, + 'middleware': lambda v: min(v, 100), 'validator': ( 'Acceptable values are 1 to 100', lambda v: v >= 1 and v <= 100 diff --git a/flask/contentapi/youtube/comments.py b/flask/contentapi/youtube/comments.py index 982f1bf..e92cfdd 100644 --- a/flask/contentapi/youtube/comments.py +++ b/flask/contentapi/youtube/comments.py @@ -58,6 +58,7 @@ def list(self, { 'name': 'maxResults', 'value': max_results, + 'middleware': lambda v: min(v, 100), 'validator': ( 'Acceptable values are 1 to 100', lambda v: v >= 1 and v <= 100 diff --git a/flask/contentapi/youtube/youtube_helpers.py b/flask/contentapi/youtube/youtube_helpers.py new file mode 100644 index 0000000..89680fb --- /dev/null +++ b/flask/contentapi/youtube/youtube_helpers.py @@ -0,0 +1,28 @@ +from typing import Callable, Iterator + + +def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator: + ''' + Iterate the api pages. + Each page contains 'items' and may contain 'nextPageToken'. + + Note: + 'max_results' in kwargs will be removed from the api call, + and used instead as the max number of items to return from all pages. + If not specified, default to 999,999,999. + ''' + max_items = kwargs.get('max_results') or 999_999_999 + items_count = 0 + api_kwargs = kwargs.copy() + while items_count < max_items: + api_kwargs['max_results'] = max_items - items_count + response = api_func(*args, **api_kwargs) + yield response + items = response.get('items') + if isinstance(items, list): + items_count += len(items) + next_page_token = response.get('nextPageToken') + if not next_page_token: + break + else: + api_kwargs['page_token'] = next_page_token diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py new file mode 100644 index 0000000..87645d9 --- /dev/null +++ b/flask/flaskr/bl.py @@ -0,0 +1,28 @@ +from glom import glom + +from contentapi.youtube import comment_threads +from contentapi.youtube.youtube_helpers import api_pages_iterator + + +def run_all(video_id: str, api_key: str, max_items: int = None): + return keep_all_video_comment_thread_ids(video_id, api_key, max_items) + + +def keep_all_video_comment_thread_ids(video_id: str, api_key: str, max_items: int = None): + all_comment_thread_ids = [] + comment_threads_api = comment_threads.CommentThreads(api_key) + pages_iterator = api_pages_iterator( + comment_threads_api.list, + max_results=max_items, + part={comment_threads.ParamPart.ID}, + video_id=video_id, + ) + # TODO: handle errors + for page in pages_iterator: + page_ids: list[str] = glom(page, ('items', ['id']), default=[]) + if not page_ids: + # TODO: log warning about empty page + pass + else: + all_comment_thread_ids.extend(page_ids) + return all_comment_thread_ids diff --git a/flask/flaskr/views.py b/flask/flaskr/views.py index 0221862..e3c0e00 100644 --- a/flask/flaskr/views.py +++ b/flask/flaskr/views.py @@ -1,12 +1,15 @@ import os from flaskr import app -from contentapi.youtube import comments +from flask import request +from .bl import run_all @app.route('/youtube/videos//comments', methods=['PUT']) def scan_youtube_video_comments(id: str): + max_items = request.args.get('max', None, int) API_KEY = os.environ.get('YOUTUBE_API_KEY', '') - comments_api = comments.Comments(API_KEY) + re = run_all(video_id=id, api_key=API_KEY, max_items=max_items) + return {'res': re}, 202 # TODO: Change this re = comments_api.list( diff --git a/flask/requirements.txt b/flask/requirements.txt index d8726b2..e7f339d 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -2,4 +2,5 @@ Flask==2.0.1 google-api-python-client==2.15.0 google-auth-oauthlib==0.4.5 google-auth-httplib2==0.1.0 -python-dotenv==0.19.0 \ No newline at end of file +python-dotenv==0.19.0 +glom==20.11.0 \ No newline at end of file From 6db88184c2543c19cef4f16ada3efc7b78cc8610 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Sun, 8 Aug 2021 04:55:25 +0300 Subject: [PATCH 05/13] VSCode debug settings --- flask/.vscode/launch.json | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 flask/.vscode/launch.json diff --git a/flask/.vscode/launch.json b/flask/.vscode/launch.json new file mode 100644 index 0000000..66754d0 --- /dev/null +++ b/flask/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Flask", + "type": "python", + "request": "launch", + "module": "flask", + "env": { + "FLASK_APP": "app.py", + "FLASK_ENV": "development" + }, + "args": [ + "run", + "--no-debugger" + ], + "jinja": true + } + ] +} \ No newline at end of file From 9808ca69072f91c30ee8803d6e3b5f3126dfc440 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Sun, 8 Aug 2021 11:35:05 +0300 Subject: [PATCH 06/13] Handle errors in pagination --- flask/contentapi/youtube/youtube_helpers.py | 11 +++++++++-- flask/flaskr/bl.py | 6 ++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/flask/contentapi/youtube/youtube_helpers.py b/flask/contentapi/youtube/youtube_helpers.py index 89680fb..ad21201 100644 --- a/flask/contentapi/youtube/youtube_helpers.py +++ b/flask/contentapi/youtube/youtube_helpers.py @@ -16,8 +16,15 @@ def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator: api_kwargs = kwargs.copy() while items_count < max_items: api_kwargs['max_results'] = max_items - items_count - response = api_func(*args, **api_kwargs) - yield response + err = None + response = None + try: + response = api_func(*args, **api_kwargs) + except Exception as e: + err = e + yield response, err + if err or not response: + break items = response.get('items') if isinstance(items, list): items_count += len(items) diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index 87645d9..c8df6dc 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -17,8 +17,10 @@ def keep_all_video_comment_thread_ids(video_id: str, api_key: str, max_items: in part={comment_threads.ParamPart.ID}, video_id=video_id, ) - # TODO: handle errors - for page in pages_iterator: + for page, err in pages_iterator: + if err: + # TODO: handle error + break page_ids: list[str] = glom(page, ('items', ['id']), default=[]) if not page_ids: # TODO: log warning about empty page From 125c48c0505bc3bdbe489d3fba71bb695e9dd7c0 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Sun, 8 Aug 2021 21:50:36 +0300 Subject: [PATCH 07/13] Get comment replies concurrently using asyncio --- flask/contentapi/youtube/comment_threads.py | 26 ++--- flask/contentapi/youtube/comments.py | 40 ++++++-- flask/contentapi/youtube/youtube_helpers.py | 11 ++- flask/flaskr/bl.py | 102 +++++++++++++++++--- flask/flaskr/db.py | 2 + flask/flaskr/views.py | 20 ++-- flask/models/__init__.py | 0 flask/models/youtube.py | 13 +++ flask/requirements.txt | 1 + 9 files changed, 164 insertions(+), 51 deletions(-) create mode 100644 flask/flaskr/db.py create mode 100644 flask/models/__init__.py create mode 100644 flask/models/youtube.py diff --git a/flask/contentapi/youtube/comment_threads.py b/flask/contentapi/youtube/comment_threads.py index 536730f..433e94f 100644 --- a/flask/contentapi/youtube/comment_threads.py +++ b/flask/contentapi/youtube/comment_threads.py @@ -1,5 +1,6 @@ import googleapiclient.discovery import googleapiclient.errors +import asyncio from enum import Enum from .base import BaseApi @@ -30,16 +31,17 @@ class CommentThreads(BaseApi): def __init__(self, api_key: str): super().__init__(api_key=api_key) - def list(self, - part: set[ParamPart], - max_results: int = None, - order: ParamOrder = None, - page_token: str = None, - search_terms: str = None, - text_format: ParamTextFormat = None, - id: set[str] = None, - video_id: str = None - ): + async def list( + self, + part: set[ParamPart], + max_results: int = None, + order: ParamOrder = None, + page_token: str = None, + search_terms: str = None, + text_format: ParamTextFormat = None, + id: set[str] = None, + video_id: str = None + ): ''' Returns a list of comment threads that match the API request parameters. ''' @@ -99,5 +101,7 @@ def list(self, self.api_service_name, self.api_version, developerKey=self.api_key) request = youtube.commentThreads().list(**params) - response = request.execute() + loop = asyncio.get_event_loop() + future = loop.run_in_executor(None, request.execute) + response = await future return response diff --git a/flask/contentapi/youtube/comments.py b/flask/contentapi/youtube/comments.py index e92cfdd..605ec60 100644 --- a/flask/contentapi/youtube/comments.py +++ b/flask/contentapi/youtube/comments.py @@ -1,9 +1,12 @@ import googleapiclient.discovery import googleapiclient.errors +import asyncio from enum import Enum +from glom import glom from .base import BaseApi from contentapi.helpers import normalize_params +from models.youtube import YoutubeComment class ParamPart(Enum): @@ -24,14 +27,15 @@ class Comments(BaseApi): def __init__(self, api_key: str): super().__init__(api_key=api_key) - def list(self, - part: set[ParamPart], - max_results: int = None, - page_token: str = None, - text_format: ParamTextFormat = None, - id: set[str] = None, - parent_id: str = None - ): + async def list( + self, + part: set[ParamPart], + max_results: int = None, + page_token: str = None, + text_format: ParamTextFormat = None, + id: set[str] = None, + parent_id: str = None + ): ''' Returns a list of comments that match the API request parameters. ''' @@ -82,5 +86,23 @@ def list(self, self.api_service_name, self.api_version, developerKey=self.api_key) request = youtube.comments().list(**params) - response = request.execute() + loop = asyncio.get_event_loop() + future = loop.run_in_executor(None, request.execute) + response = await future return response + + +def parse_api_comment(api_comment: dict) -> YoutubeComment: + if not api_comment: + api_comment = {} + yc = YoutubeComment( + comment_id=api_comment.get('id', ''), + video_id=glom(api_comment, 'snippet.videoId', default=''), + text_original=glom(api_comment, 'snippet.textOriginal', default=''), + parent_id=glom(api_comment, 'snippet.parentId', default=''), + author_channel_id=glom(api_comment, 'snippet.authorChannelId.value', default=''), + like_count=glom(api_comment, 'snippet.likeCount', default=0), + published_at=glom(api_comment, 'snippet.publishedAt', default=''), + updated_at=glom(api_comment, 'snippet.updatedAt', default=''), + ) + return yc diff --git a/flask/contentapi/youtube/youtube_helpers.py b/flask/contentapi/youtube/youtube_helpers.py index ad21201..72f9184 100644 --- a/flask/contentapi/youtube/youtube_helpers.py +++ b/flask/contentapi/youtube/youtube_helpers.py @@ -1,7 +1,7 @@ -from typing import Callable, Iterator +from typing import Callable -def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator: +async def api_pages_iterator(api_func: Callable, *args, **kwargs): ''' Iterate the api pages. Each page contains 'items' and may contain 'nextPageToken'. @@ -19,13 +19,14 @@ def api_pages_iterator(api_func: Callable, *args, **kwargs) -> Iterator: err = None response = None try: - response = api_func(*args, **api_kwargs) + response = await api_func(*args, **api_kwargs) except Exception as e: err = e - yield response, err + response = response or {} + items = response.get('items', []) + yield items, err if err or not response: break - items = response.get('items') if isinstance(items, list): items_count += len(items) next_page_token = response.get('nextPageToken') diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index c8df6dc..df4075d 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -1,30 +1,102 @@ +import asyncio from glom import glom -from contentapi.youtube import comment_threads +from contentapi.youtube import comment_threads, comments from contentapi.youtube.youtube_helpers import api_pages_iterator +from .db import insert_many_comments +WORKERS_NUM = 13 -def run_all(video_id: str, api_key: str, max_items: int = None): - return keep_all_video_comment_thread_ids(video_id, api_key, max_items) +async def run_all(video_id: str, api_key: str, max_items: int = None): + if not max_items: + max_items = 999_999_999 + shared_data = {'comments_count': 0} + top_level_comment_queue = asyncio.Queue(maxsize=max_items or 0) -def keep_all_video_comment_thread_ids(video_id: str, api_key: str, max_items: int = None): - all_comment_thread_ids = [] + # Start replies workers, they wait to get ids from the top_level_comment_queue, + # fetch replies from the api and save content to db + replies_workers = [asyncio.create_task(replies_worker( + top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)] + + # Read top level comments, save content to db and fill the top_level_comment_queue + await get_all_top_level_comments( + top_level_comment_queue, video_id, api_key, shared_data, max_items) + await asyncio.gather(*replies_workers, return_exceptions=True) + + +async def get_all_top_level_comments( + top_level_comment_queue: asyncio.Queue, + video_id: str, + api_key: str, + shared_data: dict, + max_items: int +): comment_threads_api = comment_threads.CommentThreads(api_key) pages_iterator = api_pages_iterator( comment_threads_api.list, max_results=max_items, - part={comment_threads.ParamPart.ID}, + part={comment_threads.ParamPart.ID, comment_threads.ParamPart.SNIPPET}, + order=comment_threads.ParamOrder.RELEVANCE, video_id=video_id, ) - for page, err in pages_iterator: + async for items, err in pages_iterator: if err: - # TODO: handle error + break # TODO: handle error + top_level_comments = [comments.parse_api_comment( + glom(item, 'snippet.topLevelComment')) for item in items] + if not top_level_comments: + continue + if shared_data['comments_count'] >= max_items: break - page_ids: list[str] = glom(page, ('items', ['id']), default=[]) - if not page_ids: - # TODO: log warning about empty page - pass - else: - all_comment_thread_ids.extend(page_ids) - return all_comment_thread_ids + comments_left: int = max_items - shared_data['comments_count'] + top_level_comments_to_send = top_level_comments[:comments_left] + shared_data['comments_count'] += len(top_level_comments_to_send) + + # Save in db + insert_many_comments(top_level_comments_to_send) + + # Send to queue + for tl_comment in top_level_comments_to_send: + await top_level_comment_queue.put(tl_comment.comment_id) + + +async def get_all_comment_replies( + comment_id: str, + api_key: str, + shared_data: dict, + max_items: int +): + comments_api = comments.Comments(api_key) + pages_iterator = api_pages_iterator( + comments_api.list, + max_results=max_items, + part={comments.ParamPart.ID, comments.ParamPart.SNIPPET}, + parent_id=comment_id, + ) + async for items, err in pages_iterator: + if err: + break # TODO: handle error + replies = [comments.parse_api_comment(item) for item in items] + if not replies: + continue + if shared_data['comments_count'] >= max_items: + break + comments_left: int = max_items - shared_data['comments_count'] + replies_to_send = replies[:comments_left] + shared_data['comments_count'] += len(replies_to_send) + + # Save in db + insert_many_comments(replies_to_send) + + +async def replies_worker( + top_level_comment_queue: asyncio.Queue, + api_key: str, + shared_data: dict, + max_items: int +): + while shared_data['comments_count'] < max_items: + comment_id = await top_level_comment_queue.get() + await get_all_comment_replies(comment_id, api_key, shared_data, max_items) + top_level_comment_queue.task_done() diff --git a/flask/flaskr/db.py b/flask/flaskr/db.py new file mode 100644 index 0000000..af49853 --- /dev/null +++ b/flask/flaskr/db.py @@ -0,0 +1,2 @@ +def insert_many_comments(many): + pass diff --git a/flask/flaskr/views.py b/flask/flaskr/views.py index e3c0e00..3aa00bb 100644 --- a/flask/flaskr/views.py +++ b/flask/flaskr/views.py @@ -1,20 +1,18 @@ import os +import time from flaskr import app from flask import request from .bl import run_all @app.route('/youtube/videos//comments', methods=['PUT']) -def scan_youtube_video_comments(id: str): +async def scan_youtube_video_comments(id: str): max_items = request.args.get('max', None, int) API_KEY = os.environ.get('YOUTUBE_API_KEY', '') - re = run_all(video_id=id, api_key=API_KEY, max_items=max_items) - return {'res': re}, 202 - - # TODO: Change this - re = comments_api.list( - part={comments.ParamPart.ID, comments.ParamPart.SNIPPET}, - id={id}, - max_results=10 - ) - return re, 202 + start_time = time.perf_counter() + try: + await run_all(video_id=id, api_key=API_KEY, max_items=max_items) + elapsed = time.perf_counter() - start_time + return {"message": "Accepted", "time": elapsed}, 202 + except Exception: + return {"message": "Failed"}, 500 diff --git a/flask/models/__init__.py b/flask/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flask/models/youtube.py b/flask/models/youtube.py new file mode 100644 index 0000000..acb7c32 --- /dev/null +++ b/flask/models/youtube.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass + + +@dataclass +class YoutubeComment: + comment_id: str + video_id: str + text_original: str + parent_id: str + author_channel_id: str + like_count: int + published_at: str + updated_at: str diff --git a/flask/requirements.txt b/flask/requirements.txt index e7f339d..7bcd89d 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -1,4 +1,5 @@ Flask==2.0.1 +asgiref==3.4.1 google-api-python-client==2.15.0 google-auth-oauthlib==0.4.5 google-auth-httplib2==0.1.0 From 7ec6c20cb3bfdefa2b320510cf7921cd90563148 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Wed, 11 Aug 2021 13:22:28 +0300 Subject: [PATCH 08/13] Refactor and fix dead producer async block --- .env | 1 + flask/{.env.example => .env.dev} | 4 +- flask/.flake8 | 2 +- flask/contentapi/youtube/comments.py | 4 +- flask/flaskr/bl.py | 85 +++++++++++++++++++--------- flask/flaskr/db.py | 2 +- flask/flaskr/views.py | 14 +++-- flask/models/youtube.py | 3 +- 8 files changed, 78 insertions(+), 37 deletions(-) create mode 100644 .env rename flask/{.env.example => .env.dev} (73%) diff --git a/.env b/.env new file mode 100644 index 0000000..e2e0ff2 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +ASYNC_WORKERS=13 diff --git a/flask/.env.example b/flask/.env.dev similarity index 73% rename from flask/.env.example rename to flask/.env.dev index 2d55f53..405b177 100644 --- a/flask/.env.example +++ b/flask/.env.dev @@ -1,5 +1,5 @@ FLASK_ENV=development -FLASK_APP=flaskr - YOUTUBE_API_KEY=A...z + +ASYNC_WORKERS=13 diff --git a/flask/.flake8 b/flask/.flake8 index 76df9f7..f69503b 100644 --- a/flask/.flake8 +++ b/flask/.flake8 @@ -1,5 +1,5 @@ [flake8] -ignore = E402 +ignore = E402, W504 exclude = .git,__pycache__,docs/conf.py,old,build,dist,env max-line-length = 100 \ No newline at end of file diff --git a/flask/contentapi/youtube/comments.py b/flask/contentapi/youtube/comments.py index 605ec60..4d58dda 100644 --- a/flask/contentapi/youtube/comments.py +++ b/flask/contentapi/youtube/comments.py @@ -99,10 +99,10 @@ def parse_api_comment(api_comment: dict) -> YoutubeComment: comment_id=api_comment.get('id', ''), video_id=glom(api_comment, 'snippet.videoId', default=''), text_original=glom(api_comment, 'snippet.textOriginal', default=''), - parent_id=glom(api_comment, 'snippet.parentId', default=''), + parent_id=glom(api_comment, 'snippet.parentId', default=None), author_channel_id=glom(api_comment, 'snippet.authorChannelId.value', default=''), like_count=glom(api_comment, 'snippet.likeCount', default=0), published_at=glom(api_comment, 'snippet.publishedAt', default=''), - updated_at=glom(api_comment, 'snippet.updatedAt', default=''), + updated_at=glom(api_comment, 'snippet.updatedAt', default='') ) return yc diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index df4075d..1aa7300 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -1,28 +1,33 @@ import asyncio +import os +import sys from glom import glom from contentapi.youtube import comment_threads, comments from contentapi.youtube.youtube_helpers import api_pages_iterator -from .db import insert_many_comments +from models.youtube import YoutubeComment +from . import db -WORKERS_NUM = 13 - -async def run_all(video_id: str, api_key: str, max_items: int = None): +async def scan_youtube_video_comments(video_id: str, api_key: str, max_items: int = None): + WORKERS_NUM = int(os.environ.get('ASYNC_WORKERS', 10)) if not max_items: max_items = 999_999_999 - shared_data = {'comments_count': 0} + shared_data = {'comments_count': 0, 'replies_count': 0, 'producer_alive': True} top_level_comment_queue = asyncio.Queue(maxsize=max_items or 0) # Start replies workers, they wait to get ids from the top_level_comment_queue, - # fetch replies from the api and save content to db + # fetch replies from the api and save to db replies_workers = [asyncio.create_task(replies_worker( top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)] - # Read top level comments, save content to db and fill the top_level_comment_queue + # Read top level comments, save to db and fill the top_level_comment_queue await get_all_top_level_comments( top_level_comment_queue, video_id, api_key, shared_data, max_items) + shared_data['producer_alive'] = False + await asyncio.gather(*replies_workers, return_exceptions=True) + return shared_data['comments_count'], shared_data['replies_count'] async def get_all_top_level_comments( @@ -36,33 +41,46 @@ async def get_all_top_level_comments( pages_iterator = api_pages_iterator( comment_threads_api.list, max_results=max_items, - part={comment_threads.ParamPart.ID, comment_threads.ParamPart.SNIPPET}, + part={ + comment_threads.ParamPart.ID, + comment_threads.ParamPart.SNIPPET, + comment_threads.ParamPart.REPLIES}, order=comment_threads.ParamOrder.RELEVANCE, video_id=video_id, ) async for items, err in pages_iterator: if err: - break # TODO: handle error - top_level_comments = [comments.parse_api_comment( + print(err, file=sys.stderr) # TODO: handle error + break + top_level_comments: list[YoutubeComment] = [comments.parse_api_comment( glom(item, 'snippet.topLevelComment')) for item in items] + + # Count how many replies each top_level_comment has + replies_counts = [len(glom(item, 'replies.comments', default=[])) for item in items] + if not top_level_comments: continue if shared_data['comments_count'] >= max_items: break + + # To save less comments than the number of comments left until max_items comments_left: int = max_items - shared_data['comments_count'] - top_level_comments_to_send = top_level_comments[:comments_left] - shared_data['comments_count'] += len(top_level_comments_to_send) + top_level_comments_to_save = top_level_comments[:comments_left] + shared_data['comments_count'] += len(top_level_comments_to_save) - # Save in db - insert_many_comments(top_level_comments_to_send) + db.insert_many_youtube_comments(top_level_comments_to_save) - # Send to queue - for tl_comment in top_level_comments_to_send: - await top_level_comment_queue.put(tl_comment.comment_id) + # YouTube API returns CommentThread with only part of the full replies list, + # so we can't save all replies at this point. + # Therefore, we send the top comments to the queue to handle replies reading later. + for tl_comment, replies_count in zip(top_level_comments_to_save, replies_counts): + if replies_count > 0: + await top_level_comment_queue.put((tl_comment.comment_id, tl_comment.video_id)) async def get_all_comment_replies( comment_id: str, + video_id: str, api_key: str, shared_data: dict, max_items: int @@ -76,18 +94,26 @@ async def get_all_comment_replies( ) async for items, err in pages_iterator: if err: - break # TODO: handle error - replies = [comments.parse_api_comment(item) for item in items] + print(err, file=sys.stderr) # TODO: handle error + break + replies: list[YoutubeComment] = [comments.parse_api_comment(item) for item in items] + + # Add video_id field (youtube api omits the videoId from the replies) + for r in replies: + r.video_id = video_id + if not replies: continue if shared_data['comments_count'] >= max_items: break + + # To save less comments than the number of comments left until max_items comments_left: int = max_items - shared_data['comments_count'] - replies_to_send = replies[:comments_left] - shared_data['comments_count'] += len(replies_to_send) + replies_to_save = replies[:comments_left] + shared_data['comments_count'] += len(replies_to_save) + shared_data['replies_count'] += len(replies_to_save) - # Save in db - insert_many_comments(replies_to_send) + db.insert_many_youtube_comments(replies_to_save) async def replies_worker( @@ -97,6 +123,13 @@ async def replies_worker( max_items: int ): while shared_data['comments_count'] < max_items: - comment_id = await top_level_comment_queue.get() - await get_all_comment_replies(comment_id, api_key, shared_data, max_items) - top_level_comment_queue.task_done() + try: + comment_id, video_id = top_level_comment_queue.get_nowait() + await get_all_comment_replies(comment_id, video_id, api_key, shared_data, max_items) + top_level_comment_queue.task_done() + except asyncio.QueueEmpty: + if shared_data['producer_alive']: + await asyncio.sleep(0.2) + continue + else: + break diff --git a/flask/flaskr/db.py b/flask/flaskr/db.py index af49853..ee000b9 100644 --- a/flask/flaskr/db.py +++ b/flask/flaskr/db.py @@ -1,2 +1,2 @@ -def insert_many_comments(many): +def insert_many_youtube_comments(many): pass diff --git a/flask/flaskr/views.py b/flask/flaskr/views.py index 3aa00bb..b8272bc 100644 --- a/flask/flaskr/views.py +++ b/flask/flaskr/views.py @@ -2,17 +2,23 @@ import time from flaskr import app from flask import request -from .bl import run_all +from . import bl @app.route('/youtube/videos//comments', methods=['PUT']) async def scan_youtube_video_comments(id: str): max_items = request.args.get('max', None, int) - API_KEY = os.environ.get('YOUTUBE_API_KEY', '') + api_key = os.environ.get('YOUTUBE_API_KEY', '') start_time = time.perf_counter() try: - await run_all(video_id=id, api_key=API_KEY, max_items=max_items) + comments_count, replies_count = await bl.scan_youtube_video_comments( + video_id=id, api_key=api_key, max_items=max_items) elapsed = time.perf_counter() - start_time - return {"message": "Accepted", "time": elapsed}, 202 + return { + "message": "Accepted", + "time": elapsed, + 'comments_count': comments_count, + 'replies_count': replies_count, + }, 202 except Exception: return {"message": "Failed"}, 500 diff --git a/flask/models/youtube.py b/flask/models/youtube.py index acb7c32..de2a06a 100644 --- a/flask/models/youtube.py +++ b/flask/models/youtube.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Optional @dataclass @@ -6,7 +7,7 @@ class YoutubeComment: comment_id: str video_id: str text_original: str - parent_id: str + parent_id: Optional[str] author_channel_id: str like_count: int published_at: str From c5a7ce9f9923f52fc77b54f545cfa4e3d5d62fd1 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Wed, 11 Aug 2021 14:22:22 +0300 Subject: [PATCH 09/13] Add MongoDB --- README.md | 45 ++++++++++++++++++++++++++++ flask/.env.dev | 6 ++++ flask/flaskr/db.py | 66 ++++++++++++++++++++++++++++++++++++++++-- flask/requirements.txt | 1 + mongo/.env.dev | 5 ++++ mongo/Dockerfile | 3 ++ mongo/init_mongo.sh | 23 +++++++++++++++ 7 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 mongo/.env.dev create mode 100644 mongo/Dockerfile create mode 100644 mongo/init_mongo.sh diff --git a/README.md b/README.md index e7f4ecc..5d045ef 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,47 @@ # Findevil Find evil content in YouTube comments + +## Development +### Requirements +- Python 3.9 +### Build and run MongoDB for development +```powershell +cd mongo +docker build -t mongo-findevil:latest . +docker run --name mongodb-dev -p 27018:27017 --env-file .env.dev -d mongo-findevil:latest +``` + +Use port `27018` to connect to the `dev` database: +
+`mongodb://mongodbadmin:temp_admin_password@localhost:27018` + +### Build and run Flask + +Windows: +```powershell +cd flask +copy .env.dev .env +# edit your .env file... +py -m venv env +.\env\Scripts\activate +pip install -r requirements-dev.txt +flask run +``` + +Linux: +```shell +cd flask +cp .env.dev .env +# edit your .env file... +python3 -m venv env +source env/bin/activate +pip install -r requirements-dev.txt +flask run +``` + +Use port `5000` to send requests to the `dev` server: +
+`http://127.0.0.1:5000/youtube/videos//comments` `[PUT]` + +### Debugging +To debug Flask, instead of `flask run` use the debugger in VSCode. \ No newline at end of file diff --git a/flask/.env.dev b/flask/.env.dev index 405b177..16c3bc2 100644 --- a/flask/.env.dev +++ b/flask/.env.dev @@ -3,3 +3,9 @@ FLASK_ENV=development YOUTUBE_API_KEY=A...z ASYNC_WORKERS=13 + +MONGODB_DATABASE=flaskdb +MONGODB_USERNAME=mongodbuser +MONGODB_PASSWORD=temp_password +MONGODB_HOSTNAME=localhost +MONGODB_PORT=27018 \ No newline at end of file diff --git a/flask/flaskr/db.py b/flask/flaskr/db.py index ee000b9..36a720f 100644 --- a/flask/flaskr/db.py +++ b/flask/flaskr/db.py @@ -1,2 +1,64 @@ -def insert_many_youtube_comments(many): - pass +import os +from flask import g +from pymongo import MongoClient, UpdateOne +from dataclasses import asdict + +from flaskr import app +from models.youtube import YoutubeComment + + +def _get_db(): + if 'db' not in g: + database = os.environ.get("MONGODB_DATABASE", '') + username = os.environ.get("MONGODB_USERNAME", '') + password = os.environ.get("MONGODB_PASSWORD", '') + hostname = os.environ.get("MONGODB_HOSTNAME", '') + port = os.environ.get("MONGODB_PORT", '') + uri = f'mongodb://{username}:{password}@{hostname}:{port}/{database}' + client = MongoClient(uri) + g.db = client[database] + return g.db + + +@app.teardown_appcontext +def _teardown_db(exception=None): + ''' + Closes the db resource + See https://flask.palletsprojects.com/en/2.0.x/appcontext/ + ''' + db = g.pop('db', None) + if db: + try: + db.client.close() + except Exception: + pass + + +def _remove_none_keys(a_dict: dict): + return {k: v for k, v in a_dict.items() if v is not None} + + +def _rename_dict_key(a_dict, old_key, new_key): + a_dict[new_key] = a_dict.pop(old_key) + + +def _dataclass_to_dict(dataclass_obj, key_as_id: str = None): + as_dict = asdict(dataclass_obj) + as_dict = _remove_none_keys(as_dict) + if key_as_id: + _rename_dict_key(as_dict, key_as_id, '_id') + return as_dict + + +def _insert_or_update_many(dict_list: list[dict], ordered: bool = True): + db = _get_db() + requests = [UpdateOne({'_id': d['_id']}, {'$set': d}, upsert=True) for d in dict_list] + db.youtube.bulk_write(requests, ordered=ordered) + + +# ---------------- PUBLIC ---------------- + + +def insert_many_youtube_comments(many_comments: list[YoutubeComment]): + dict_list = [_dataclass_to_dict(d, key_as_id='comment_id') for d in many_comments] + _insert_or_update_many(dict_list, ordered=False) diff --git a/flask/requirements.txt b/flask/requirements.txt index 7bcd89d..699a074 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -1,5 +1,6 @@ Flask==2.0.1 asgiref==3.4.1 +pymongo==3.12.0 google-api-python-client==2.15.0 google-auth-oauthlib==0.4.5 google-auth-httplib2==0.1.0 diff --git a/mongo/.env.dev b/mongo/.env.dev new file mode 100644 index 0000000..4d27ec9 --- /dev/null +++ b/mongo/.env.dev @@ -0,0 +1,5 @@ +MONGO_INITDB_ROOT_USERNAME=mongodbadmin +MONGO_INITDB_ROOT_PASSWORD=temp_admin_password +MONGO_INITDB_DATABASE=flaskdb +flaskdbUser=mongodbuser +flaskdbPwd=temp_password \ No newline at end of file diff --git a/mongo/Dockerfile b/mongo/Dockerfile new file mode 100644 index 0000000..e3b41a9 --- /dev/null +++ b/mongo/Dockerfile @@ -0,0 +1,3 @@ +FROM mongo:5.0.2 + +COPY ./init_mongo.sh /docker-entrypoint-initdb.d/ \ No newline at end of file diff --git a/mongo/init_mongo.sh b/mongo/init_mongo.sh new file mode 100644 index 0000000..a946e81 --- /dev/null +++ b/mongo/init_mongo.sh @@ -0,0 +1,23 @@ +#!/bin/bash +set -e + +# flaskdbUser is the userName used from applicatoin code to interact with databases +# and flaskdbPwd is the password for this user. +# MONGO_INITDB_ROOT_USERNAME & MONGO_INITDB_ROOT_PASSWORD is the config for db admin. +# admin user is expected to be already created when this script executes. +# We use it here to authenticate as admin to create flaskdbUser and databases. + +echo ">>>>>>> trying to create database and users" +if [ -n "${MONGO_INITDB_ROOT_USERNAME:-}" ] && + [ -n "${MONGO_INITDB_ROOT_PASSWORD:-}" ] && + [ -n "${flaskdbUser:-}" ] && + [ -n "${flaskdbPwd:-}" ]; then +mongosh -u $MONGO_INITDB_ROOT_USERNAME -p $MONGO_INITDB_ROOT_PASSWORD< Date: Wed, 11 Aug 2021 19:19:21 +0300 Subject: [PATCH 10/13] Add docker-compose to run prod --- .env | 6 ++++++ README.md | 47 ++++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 50 +++++++++++++++++++++++++++++++++++++++++++++ flask/.dockerignore | 7 +++++++ flask/Dockerfile | 20 ++++++++++++++++++ 5 files changed, 130 insertions(+) create mode 100644 docker-compose.yml create mode 100644 flask/.dockerignore create mode 100644 flask/Dockerfile diff --git a/.env b/.env index e2e0ff2..d17d077 100644 --- a/.env +++ b/.env @@ -1 +1,7 @@ ASYNC_WORKERS=13 + +MONGODB_FLASK_USERNAME=mongodbuser +MONGODB_FLASK_PASSWORD=temp_password + +MONGODB_ADMIN_USERNAME=mongodbadmin +MONGODB_ADMIN_PASSWORD=temp_admin_password diff --git a/README.md b/README.md index 5d045ef..9a9c150 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,56 @@ # Findevil Find evil content in YouTube comments +## Get started +```powershell +# in Linux replace 'set' with 'export' +set YOUTUBE_API_KEY= +docker-compose up -d --build +``` + +Before `docker-compose up`, you can set more optional settings: +```powershell +# in Linux replace 'set' with 'export' +set ASYNC_WORKERS= +set MONGODB_FLASK_USERNAME= +set MONGODB_FLASK_PASSWORD= +set MONGODB_ADMIN_USERNAME= +set MONGODB_ADMIN_PASSWORD= +``` + +### Send a request +> You can use tools like [Postman](https://www.postman.com/) to send HTTP requests. + +Scan all comments of a YouTube video: +
+`http://127.0.0.1:5001/youtube/videos//comments` `[PUT]` + +Or with `max` parameter: +
+`http://127.0.0.1:5001/youtube/videos//comments?max=` `[PUT]` + +### Connect to MongoDB +Now you can connect to the database and view the new data in `flaskdb` db. +
+The connection string is: +`mongodb://mongodbadmin:temp_admin_password@localhost:27019` + +> Change `mongodbadmin` and `temp_admin_password` +if you used custom `MONGODB_ADMIN_USERNAME` and `MONGODB_ADMIN_PASSWORD`. + +### Clean the database +To delete the users and data in MongoDB, run: +``` +docker-compose down +docker volume rm findevil_mongodbdata +``` + +Now you can start again with fresh db and run `docker-compose up` as described above. + ## Development ### Requirements - Python 3.9 +- Docker ### Build and run MongoDB for development ```powershell cd mongo diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..382d522 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,50 @@ +version: '3.7' + +services: + flask: + build: ./flask + container_name: flask + image: flask-findevil:latest + restart: unless-stopped + environment: + YOUTUBE_API_KEY: ${YOUTUBE_API_KEY} + ASYNC_WORKERS: ${ASYNC_WORKERS} + MONGODB_DATABASE: flaskdb + MONGODB_USERNAME: ${MONGODB_FLASK_USERNAME} + MONGODB_PASSWORD: ${MONGODB_FLASK_PASSWORD} + MONGODB_HOSTNAME: mongodb + MONGODB_PORT: 27017 # default port, used inside the network + depends_on: + - mongodb + ports: + - 5001:5000 + networks: + - backend + + mongodb: + build: ./mongo + container_name: mongodb + image: mongo-findevil:latest + restart: unless-stopped + environment: + MONGO_INITDB_ROOT_USERNAME: ${MONGODB_ADMIN_USERNAME} + MONGO_INITDB_ROOT_PASSWORD: ${MONGODB_ADMIN_PASSWORD} + MONGO_INITDB_DATABASE: flaskdb + flaskdbUser: ${MONGODB_FLASK_USERNAME} + flaskdbPwd: ${MONGODB_FLASK_PASSWORD} + MONGODB_DATA_DIR: /data/db + MONDODB_LOG_DIR: /dev/null + volumes: + # see https://stackoverflow.com/questions/54911021/unable-to-start-docker-mongo-image-on-windows + - mongodbdata:/data/db + networks: + - backend + ports: + - 27019:27017 + +networks: + backend: + driver: bridge +volumes: + mongodbdata: + driver: local \ No newline at end of file diff --git a/flask/.dockerignore b/flask/.dockerignore new file mode 100644 index 0000000..ad4a1a1 --- /dev/null +++ b/flask/.dockerignore @@ -0,0 +1,7 @@ +.vscode/ +env/ +__pycache__/ +*.py[cod] +.env +.gitignore +Dockerfile \ No newline at end of file diff --git a/flask/Dockerfile b/flask/Dockerfile new file mode 100644 index 0000000..e05f510 --- /dev/null +++ b/flask/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.9-slim-buster + +# Keeps Python from generating .pyc files in the container +ENV PYTHONDONTWRITEBYTECODE=1 + +# Turns off buffering for easier container logging +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app +COPY . /app + +# Install pip requirements +RUN python -m pip install -r requirements.txt + +# Creates a non-root user with an explicit UID and adds permission to access the /app folder +# For more info, please refer to https://aka.ms/vscode-docker-python-configure-containers +RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app +USER appuser + +CMD ["flask", "run", "--host", "0.0.0.0"] \ No newline at end of file From 5560b0363fdb9539c5433d1424ece582e136214b Mon Sep 17 00:00:00 2001 From: NoamNol Date: Wed, 11 Aug 2021 19:20:01 +0300 Subject: [PATCH 11/13] Add todo list to README --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9a9c150..77c8ed5 100644 --- a/README.md +++ b/README.md @@ -91,4 +91,11 @@ Use port `5000` to send requests to the `dev` server: `http://127.0.0.1:5000/youtube/videos//comments` `[PUT]` ### Debugging -To debug Flask, instead of `flask run` use the debugger in VSCode. \ No newline at end of file +To debug Flask, instead of `flask run` use the debugger in VSCode. + +## Todo +- Find links in comments. +- Find evil content in comments. +- Consider using [Aiogoogle](https://github.com/omarryhan/aiogoogle) for better async. +- Add load balancing to handle multiple requests. + From 83729d1ee5280ea0e25aa0642748d5ee680a3832 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Wed, 11 Aug 2021 19:46:15 +0300 Subject: [PATCH 12/13] Refactor bl --- flask/flaskr/bl.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index 1aa7300..c8ce90b 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -18,11 +18,11 @@ async def scan_youtube_video_comments(video_id: str, api_key: str, max_items: in # Start replies workers, they wait to get ids from the top_level_comment_queue, # fetch replies from the api and save to db - replies_workers = [asyncio.create_task(replies_worker( + replies_workers = [asyncio.create_task(_replies_worker( top_level_comment_queue, api_key, shared_data, max_items)) for _ in range(WORKERS_NUM)] # Read top level comments, save to db and fill the top_level_comment_queue - await get_all_top_level_comments( + await _get_all_top_level_comments( top_level_comment_queue, video_id, api_key, shared_data, max_items) shared_data['producer_alive'] = False @@ -30,7 +30,10 @@ async def scan_youtube_video_comments(video_id: str, api_key: str, max_items: in return shared_data['comments_count'], shared_data['replies_count'] -async def get_all_top_level_comments( +# ---------------- PRIVATE ---------------- + + +async def _get_all_top_level_comments( top_level_comment_queue: asyncio.Queue, video_id: str, api_key: str, @@ -63,7 +66,7 @@ async def get_all_top_level_comments( if shared_data['comments_count'] >= max_items: break - # To save less comments than the number of comments left until max_items + # Limit the comments number to the number of comments left until max_items comments_left: int = max_items - shared_data['comments_count'] top_level_comments_to_save = top_level_comments[:comments_left] shared_data['comments_count'] += len(top_level_comments_to_save) @@ -78,7 +81,7 @@ async def get_all_top_level_comments( await top_level_comment_queue.put((tl_comment.comment_id, tl_comment.video_id)) -async def get_all_comment_replies( +async def _get_all_comment_replies( comment_id: str, video_id: str, api_key: str, @@ -107,7 +110,7 @@ async def get_all_comment_replies( if shared_data['comments_count'] >= max_items: break - # To save less comments than the number of comments left until max_items + # Limit the comments number to the number of comments left until max_items comments_left: int = max_items - shared_data['comments_count'] replies_to_save = replies[:comments_left] shared_data['comments_count'] += len(replies_to_save) @@ -116,7 +119,7 @@ async def get_all_comment_replies( db.insert_many_youtube_comments(replies_to_save) -async def replies_worker( +async def _replies_worker( top_level_comment_queue: asyncio.Queue, api_key: str, shared_data: dict, @@ -125,7 +128,7 @@ async def replies_worker( while shared_data['comments_count'] < max_items: try: comment_id, video_id = top_level_comment_queue.get_nowait() - await get_all_comment_replies(comment_id, video_id, api_key, shared_data, max_items) + await _get_all_comment_replies(comment_id, video_id, api_key, shared_data, max_items) top_level_comment_queue.task_done() except asyncio.QueueEmpty: if shared_data['producer_alive']: From 1aabcff83c4b14ec04a620981d6f29d61e1ccd55 Mon Sep 17 00:00:00 2001 From: NoamNol Date: Thu, 12 Aug 2021 10:26:21 +0300 Subject: [PATCH 13/13] Find links in comments --- README.md | 6 ++--- flask/.flake8 | 2 +- flask/contentapi/content_type.py | 10 +++++++++ flask/flaskr/bl.py | 38 ++++++++++++++++++++++++++++++-- flask/flaskr/db.py | 31 +++++++++++++++++++++++++- flask/requirements.txt | 3 ++- 6 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 flask/contentapi/content_type.py diff --git a/README.md b/README.md index 77c8ed5..528a4c7 100644 --- a/README.md +++ b/README.md @@ -94,8 +94,8 @@ Use port `5000` to send requests to the `dev` server: To debug Flask, instead of `flask run` use the debugger in VSCode. ## Todo -- Find links in comments. - Find evil content in comments. -- Consider using [Aiogoogle](https://github.com/omarryhan/aiogoogle) for better async. -- Add load balancing to handle multiple requests. +- Performance: Find links in comments by using another service. +- Performance: Consider using [Aiogoogle](https://github.com/omarryhan/aiogoogle) for better async. +- Performance: Add load balancing to handle multiple requests. diff --git a/flask/.flake8 b/flask/.flake8 index f69503b..8db2ba4 100644 --- a/flask/.flake8 +++ b/flask/.flake8 @@ -1,5 +1,5 @@ [flake8] -ignore = E402, W504 +ignore = E402, E123, W504 exclude = .git,__pycache__,docs/conf.py,old,build,dist,env max-line-length = 100 \ No newline at end of file diff --git a/flask/contentapi/content_type.py b/flask/contentapi/content_type.py new file mode 100644 index 0000000..e9feb35 --- /dev/null +++ b/flask/contentapi/content_type.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class ContentType(Enum): + ''' + All types of content in Findevil. + + The value is the name of the corresponding table/collection in the db. + ''' + YOUTUBE_COMMENT = 'youtube' diff --git a/flask/flaskr/bl.py b/flask/flaskr/bl.py index c8ce90b..ff8d30b 100644 --- a/flask/flaskr/bl.py +++ b/flask/flaskr/bl.py @@ -1,9 +1,12 @@ import asyncio import os import sys +from urlextract import URLExtract from glom import glom +from typing import Tuple from contentapi.youtube import comment_threads, comments +from contentapi.content_type import ContentType from contentapi.youtube.youtube_helpers import api_pages_iterator from models.youtube import YoutubeComment from . import db @@ -33,6 +36,37 @@ async def scan_youtube_video_comments(video_id: str, api_key: str, max_items: in # ---------------- PRIVATE ---------------- +def _handle_youtube_comments(comments: list[YoutubeComment]): + if not comments: + return + links_in_comments = _find_links_in_comments(comments) + db.insert_many_youtube_comments(comments) + db.insert_many_content_links(links_in_comments) + + +def _find_links_in_comments( + comments: list[YoutubeComment] +) -> list[Tuple[str, db.ContentItemId]]: + ''' + Find links in Youtube comments, and bind each link to the comment it came from. + ''' + all_links = [] + for c in comments: + links_in_comment = _find_links_in_text(c.text_original) + links_with_to_content_id = [(link, db.ContentItemId( + content_id=c.comment_id, + content_type=ContentType.YOUTUBE_COMMENT) + ) for link in links_in_comment] + all_links.extend(links_with_to_content_id) + return all_links + + +def _find_links_in_text(text: str) -> list[str]: + extractor = URLExtract() + urls = extractor.find_urls(text, only_unique=True) + return urls + + async def _get_all_top_level_comments( top_level_comment_queue: asyncio.Queue, video_id: str, @@ -71,7 +105,7 @@ async def _get_all_top_level_comments( top_level_comments_to_save = top_level_comments[:comments_left] shared_data['comments_count'] += len(top_level_comments_to_save) - db.insert_many_youtube_comments(top_level_comments_to_save) + _handle_youtube_comments(top_level_comments_to_save) # YouTube API returns CommentThread with only part of the full replies list, # so we can't save all replies at this point. @@ -116,7 +150,7 @@ async def _get_all_comment_replies( shared_data['comments_count'] += len(replies_to_save) shared_data['replies_count'] += len(replies_to_save) - db.insert_many_youtube_comments(replies_to_save) + _handle_youtube_comments(replies_to_save) async def _replies_worker( diff --git a/flask/flaskr/db.py b/flask/flaskr/db.py index 36a720f..2665250 100644 --- a/flask/flaskr/db.py +++ b/flask/flaskr/db.py @@ -1,10 +1,26 @@ import os from flask import g from pymongo import MongoClient, UpdateOne -from dataclasses import asdict +from dataclasses import dataclass, asdict +from typing import Any, Tuple from flaskr import app from models.youtube import YoutubeComment +from contentapi.content_type import ContentType + + +@dataclass +class ContentItemId: + ''' + Identity of single content item in the db. + + The identity is simply the "collection name" + "document id" of the content item. + + For example, a YouTube comment is a content item, + and its identity is "youtube" + "comment id". + ''' + content_type: ContentType + content_id: Any def _get_db(): @@ -62,3 +78,16 @@ def _insert_or_update_many(dict_list: list[dict], ordered: bool = True): def insert_many_youtube_comments(many_comments: list[YoutubeComment]): dict_list = [_dataclass_to_dict(d, key_as_id='comment_id') for d in many_comments] _insert_or_update_many(dict_list, ordered=False) + + +def insert_many_content_links(links: list[Tuple[str, ContentItemId]]): + if not links: + return + dict_list = [{ + 'link': link, + 'source_type': content_item_id.content_type.value, + 'source_id': content_item_id.content_id + } for link, content_item_id in links] + + db = _get_db() + db.links.insert_many(dict_list, ordered=False) diff --git a/flask/requirements.txt b/flask/requirements.txt index 699a074..94395ee 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -5,4 +5,5 @@ google-api-python-client==2.15.0 google-auth-oauthlib==0.4.5 google-auth-httplib2==0.1.0 python-dotenv==0.19.0 -glom==20.11.0 \ No newline at end of file +glom==20.11.0 +urlextract==1.3.0 \ No newline at end of file