Skip to content

Commit

Permalink
Update YouTube._get_recommendations_for_home
Browse files Browse the repository at this point in the history
- Use new Youtube.get_related_videos and local history
- Fix anxdpanic#508
  • Loading branch information
MoojMidge committed Jan 7, 2024
1 parent 08b59a0 commit 5393dec
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 116 deletions.
292 changes: 192 additions & 100 deletions resources/lib/youtube_plugin/youtube/client/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
from __future__ import absolute_import, division, unicode_literals

import copy
import re
import threading
import xml.etree.ElementTree as ET
from itertools import chain
from random import randint

from .login_client import LoginClient
from ..helper.video_info import VideoInfo
Expand Down Expand Up @@ -369,129 +370,189 @@ def get_video_categories(self, page_token='', **kwargs):
**kwargs)

def _get_recommendations_for_home(self):
# YouTube has deprecated this API, so use history and related items to form
# a recommended set. We cache aggressively because searches incur a high
# quota cost of 100 on the YouTube API.
# Note this is a first stab attempt and can be refined a lot more.
# YouTube has deprecated this API, so we use history and related items
# to form a recommended set.
# We cache aggressively because searches can be slow.
# Note this is a naive implementation and can be refined a lot more.

# Do we have a cached result?
cache = self._context.get_data_cache()
# Caching of complete recommendation results currently disabled to allow
# for some variation in recommendations
# cache_home_key = 'get-activities-home'
# cached = cache.get_item(cache_home_key, cache.ONE_HOUR)
# if cached and cached['items']:
# return cached

payload = {
'kind': 'youtube#activityListResponse',
'items': []
}

local_history = self._context.get_settings().use_local_history()
history_id = self._context.get_access_manager().get_watch_history_id()
if not history_id or history_id == 'HL':
return payload

cache = self._context.get_data_cache()

# Do we have a cached result?
cache_home_key = 'get-activities-home'
cached = cache.get_item(cache_home_key, cache.ONE_HOUR * 4)
cached = cached and cached.get('items')
if cached:
return cached
if local_history:
history = self._context.get_playback_history()
video_ids = history.get_items()
else:
return payload
else:
history = self.get_playlist_items(history_id, max_results=50)
if history and 'items' in history:
history_items = history['items'] or []
video_ids = []
else:
return payload
for item in history_items:
try:
video_ids.append(item['snippet']['resourceId']['videoId'])
except KeyError:
continue

# Fetch existing list of items, if any
cache_items_key = 'get-activities-home-items'
cached = cache.get_item(cache_items_key, cache.ONE_WEEK * 2)
items = cached if cached else []
cached = cache.get_item(cache_items_key, cache.ONE_WEEK * 2) or []

items_per_page = self._max_results
items = [[] for _ in range(len(video_ids))]
counts = {
'_counter': 0,
'_pages': {},
'_related': {},
}

# Fetch history and recommended items. Use threads for faster execution.
def helper(video_id, responses):
self._context.log_debug(
'Method get_activities: doing expensive API fetch for related'
'items for video %s' % video_id
)
di = self.get_related_videos(video_id, max_results=10)
if 'items' in di:
# Record for which video we fetched the items
for item in di['items']:
item['plugin_fetched_for'] = video_id
responses.extend(di['items'])
def update_counts(items, item_store=None, original_ids=None):
if original_ids is not None:
original_ids = list(original_ids)

for item in items:
related = item['related_video_id']
channel = item['related_channel_id']
video_id = item['id']

counts['_related'].setdefault(related, 0)
counts['_related'][related] += 1

if video_id in counts:
item_count = counts[video_id]
item_count['related'].setdefault(related, 0)
item_count['related'][related] += 1
item_count['channels'].setdefault(channel, 0)
item_count['channels'][channel] += 1
else:
counts[video_id] = {
'related': {related: 1},
'channels': {channel: 1}
}
if item_store is None:
continue
if original_ids and related in original_ids:
idx = original_ids.index(related)
else:
idx = 0
item['order'] = items_per_page * idx + len(item_store[idx])
item_store[idx].append(item)

history = self.get_playlist_items(history_id, max_results=50)
update_counts(cached)

if not history.get('items'):
return payload
# Fetch history and recommended items. Use threads for faster execution.
def helper(video_id, responses):
related_videos = self.get_related_videos(video_id)
if related_videos:
responses.extend(related_videos['items'][:items_per_page])

running = 0
threads = []
candidates = []
already_fetched_for_video_ids = [item['plugin_fetched_for'] for item in items]
history_items = [item for item in history['items']
if re.match(r'(?P<video_id>[\w-]{11})',
item['snippet']['resourceId']['videoId'])]
for video_id in video_ids:
if video_id in counts['_related']:
continue
running += 1
thread = threading.Thread(
target=helper,
args=(video_id, candidates),
)
thread.daemon = True
threads.append(thread)
thread.start()

# TODO:
# It would be nice to make this 8 user configurable
for item in history_items[:8]:
video_id = item['snippet']['resourceId']['videoId']
if video_id not in already_fetched_for_video_ids:
thread = threading.Thread(target=helper, args=(video_id, candidates))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

# Prepend new candidates to items
seen = [item['id']['videoId'] for item in items]
for candidate in candidates:
vid = candidate['id']['videoId']
if vid not in seen:
seen.append(vid)
candidate['plugin_created_date'] = datetime_parser.since_epoch()
items.insert(0, candidate)
while running:
for thread in threads:
thread.join(5)
if not thread.is_alive():
running -= 1

update_counts(candidates[:500], items, video_ids)

# Truncate items to keep it manageable, and cache
items = items[:500]
items = list(chain.from_iterable(items))
counts['_counter'] = len(items)
remaining = 500 - counts['_counter']
if remaining > 0:
items.extend(cached[:remaining])
elif remaining:
items = items[:500]
cache.set_item(cache_items_key, items)

# Build the result set
items.sort(
key=lambda a: a.get('plugin_created_date', 0),
reverse=True
)
sorted_items = []
counter = 0
channel_counts = {}
while items:
counter += 1

# Hard stop on iteration. Good enough for our purposes.
if counter >= 1000:
break
# Finally sort items per page by rank and date for a better distribution
def rank_and_sort(item):
if 'order' not in item:
counts['_counter'] += 1
item['order'] = counts['_counter']

# Reset channel counts on a new page
if counter % 50 == 0:
channel_counts = {}
page = 1 + item['order'] // items_per_page
page_count = counts['_pages'].setdefault(page, {'_counter': 0})
while page_count['_counter'] < items_per_page and page > 1:
page -= 1
page_count = counts['_pages'].setdefault(page, {'_counter': 0})

# Ensure a single channel isn't hogging the page
item = items.pop()
related_video = item['related_video_id']
related_channel = item['related_channel_id']
channel_id = item.get('snippet', {}).get('channelId')
if not channel_id:
continue

channel_counts.setdefault(channel_id, 0)
if channel_counts[channel_id] <= 3:
# Use the item
channel_counts[channel_id] += 1
item["page_number"] = counter // 50
sorted_items.append(item)
else:
# Move the item to the end of the list
items.append(item)

# Finally sort items per page by date for a better distribution
def _sort_by_date_time(item):
return (item['page_number'],
-datetime_parser.since_epoch(datetime_parser.parse(
item['snippet']['publishedAt']
)))
# Video channel and related channel can be the same which can double
# up the channel count. Checking for this allows more similar videos
# in the recommendation, ignoring it allows for more variety.
# Currently prefer not to check for this to allow more variety.
# if channel_id == related_channel:
# channel_id = None
diversity_limits = items_per_page // 5
while (page_count['_counter'] >= items_per_page
or (related_video in page_count
and page_count[related_video] >= diversity_limits)
or (related_channel and related_channel in page_count
and page_count[related_channel] >= diversity_limits)
or (channel_id and channel_id in page_count
and page_count[channel_id] >= diversity_limits)
):
page += 1
page_count = counts['_pages'].setdefault(page, {'_counter': 0})

page_count.setdefault(related_video, 0)
page_count[related_video] += 1
if related_channel:
page_count.setdefault(related_channel, 0)
page_count[related_channel] += 1
if channel_id:
page_count.setdefault(channel_id, 0)
page_count[channel_id] += 1
page_count['_counter'] += 1
item['page'] = page

item_count = counts[item['id']]
item['rank'] = (2 * sum(item_count['channels'].values())
+ sum(item_count['related'].values()))

return (
-item['page'],
item['rank'],
-randint(0, item['order'])
)

sorted_items.sort(key=_sort_by_date_time)
items.sort(key=rank_and_sort, reverse=True)

# Finalize result
payload['items'] = sorted_items
payload['items'] = items
"""
# TODO:
# Enable pagination
Expand All @@ -501,9 +562,9 @@ def _sort_by_date_time(item):
}
"""
# Update cache
cache.set_item(cache_home_key, payload)
# Currently disabled to allow some variation in recommendations
# cache.set_item(cache_home_key, payload)

# If there are no sorted_items we fall back to default API behaviour
return payload

def get_activities(self, channel_id, page_token='', **kwargs):
Expand All @@ -514,7 +575,7 @@ def get_activities(self, channel_id, page_token='', **kwargs):

if channel_id == 'home':
recommended = self._get_recommendations_for_home()
if 'items' in recommended and recommended.get('items'):
if 'items' in recommended and recommended['items']:
return recommended
if channel_id == 'home':
params['home'] = 'true'
Expand Down Expand Up @@ -781,18 +842,49 @@ def get_related_videos(self,
if not related_videos:
return []

channel_id = self.json_traverse(
result,
path=(
'contents',
'twoColumnWatchNextResults',
'results',
'results',
'contents',
1,
'videoSecondaryInfoRenderer',
'owner',
'videoOwnerRenderer',
'title',
'runs',
0,
'navigationEndpoint',
'browseEndpoint',
'browseId'
)
)

v3_response = {
'kind': 'youtube#videoListResponse',
'items': [
{
'kind': "youtube#video",
'id': video['videoId'],
'related_video_id': video_id,
'related_channel_id': channel_id,
'snippet': {
'title': video['title']['simpleText'],
'thumbnails': dict(zip(
('default', 'high'),
video['thumbnail']['thumbnails'],
)),
'channelId': self.json_traverse(video, (
('longBylineText', 'shortBylineText'),
'runs',
0,
'navigationEndpoint',
'browseEndpoint',
'browseId',
)),
}
}
for video in related_videos
Expand Down
Loading

0 comments on commit 5393dec

Please sign in to comment.