Skip to content

Commit

Permalink
Paths file is checked every day and newer is downloaded if available
Browse files Browse the repository at this point in the history
  • Loading branch information
Screwdgeh committed Nov 13, 2020
1 parent ad93e30 commit 1d2a2f0
Showing 1 changed file with 81 additions and 64 deletions.
145 changes: 81 additions & 64 deletions custom_components/samsungtv_tizen/media_player.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Support for interface with an Samsung TV."""
import asyncio
from datetime import timedelta, datetime
from dateutil.parser import parse as parsedate
import logging
import socket
import json
Expand Down Expand Up @@ -128,8 +129,10 @@
}
)

MEDIA_IMAGE_OPTIONS = {'none': 'none', 'blue-color': '05a9f4-color', 'blue-white': '05a9f4-white', 'dark-white': '282c34-white', 'darkblue-white': '212c39-white', 'white-color': 'fff-color', 'transparent-color': 'transparent-color', 'transparent-white': 'transparent-white'}
MEDIA_IMAGE_OPTIONS = {'none': 'none', 'bluecolor': '05a9f4-color', 'blue-white': '05a9f4-white', 'dark-white': '282c34-white', 'darkblue-white': '212c39-white', 'white-color': 'fff-color', 'transparent-color': 'transparent-color', 'transparent-white': 'transparent-white'}
MEDIA_IMAGE_BASE_URL = 'https://jaruba.github.io/channel-logos/'
MEDIA_FILE_IMAGE_TO_PATH = os.path.dirname(os.path.realpath(__file__)) + '/logo_paths.json'
MEDIA_FILE_DAYS_BEFORE_UPDATE = 1
MEDIA_IMAGE_MIN_SCORE_REQUIRED = 80
MEDIA_TITLE_KEYWORD_REMOVAL = ['HD']

Expand Down Expand Up @@ -268,15 +271,16 @@ def __init__(self, host, port, name, timeout, mac, uuid, update_method, update_c
self._media_title = None
self._media_image_url = None
self._media_image_base_url = None
self._last_paths_file_check = None

if show_logos in MEDIA_IMAGE_OPTIONS:
if MEDIA_IMAGE_OPTIONS[show_logos] == "none":
self._media_image_base_url = None
else:
self._media_image_base_url = "https://jaruba.github.io/channel-logos/export/{}".format(MEDIA_IMAGE_OPTIONS[show_logos])
self._media_image_base_url = "{}export/{}".format(MEDIA_IMAGE_BASE_URL, MEDIA_IMAGE_OPTIONS[show_logos])
else:
_LOGGER.warning("Unrecognized value '%s' for 'show_logos' option (%s). Using default value.",show_logos,self._name)
self._media_image_base_url = "https://jaruba.github.io/channel-logos/export/fff-color"
self._media_image_base_url = "{}export/fff-color".format(MEDIA_IMAGE_BASE_URL)

def _split_app_list(self, app_list, sep = "/"):
retval = {"app": {}, "appST": {}}
Expand Down Expand Up @@ -423,8 +427,13 @@ def update(self):
if self._state == STATE_OFF:
self._end_of_power_off = None

if self._last_paths_file_check is None or self._last_paths_file_check < (datetime.now().astimezone()-timedelta(days=MEDIA_FILE_DAYS_BEFORE_UPDATE)):
self._ensure_latest_path_file()
self._last_paths_file_check = datetime.now().astimezone()

self.hass.async_add_job(self._update_media_data)


def _get_source(self):
"""Return the current input source."""
if self._state != STATE_OFF:
Expand Down Expand Up @@ -569,6 +578,7 @@ def media_image_url(self):
"""Return the media image URL."""
return self._media_image_url


async def _update_media_data(self):
if self._state == STATE_OFF and self._update_method != "smartthings":
self._media_title = None
Expand Down Expand Up @@ -605,6 +615,33 @@ async def _update_media_data(self):
self._media_image_url = None
self._media_title = new_media_title


def _ensure_latest_path_file(self):
''' Does check if paths files exists and if it does - is it out of date or not'''
if os.path.isfile(MEDIA_FILE_IMAGE_TO_PATH):
file_date = datetime.utcfromtimestamp(os.path.getmtime(MEDIA_FILE_IMAGE_TO_PATH)).astimezone()
if file_date < (datetime.now().astimezone()-timedelta(days=MEDIA_FILE_DAYS_BEFORE_UPDATE)):
try:
response = requests.head(MEDIA_IMAGE_BASE_URL + "logo_paths.json")
url_date = parsedate(response.headers.get("Last-Modified")).astimezone()
if url_date > file_date:
self._download_latest_path_file()
except:
_LOGGER.warning("Not able to update paths file for logos. No logos might be shown.")
else:
self._download_latest_path_file()


def _download_latest_path_file(self):
try:
response = requests.get(MEDIA_IMAGE_BASE_URL + "logo_paths.json")
if response.text:
with open(MEDIA_FILE_IMAGE_TO_PATH, mode='w+',encoding='utf-8') as paths_file:
paths_file.write(response.text)
except:
_LOGGER.warning("Not able to update paths file for logos. No logos might be shown.")


async def _match_title_to_image(self, media_title):
if media_title is not None:
for word in MEDIA_TITLE_KEYWORD_REMOVAL:
Expand All @@ -613,29 +650,30 @@ async def _match_title_to_image(self, media_title):
try:
with open(MEDIA_FILE_IMAGE_TO_PATH, 'r') as f:
image_paths = iter(json.load(f).items())
best_match = {"ratio": None, "title": None, "path": None}
while True:
try:
image_path = next(image_paths)
ratio = self._levenshtein_ratio(media_title,image_path[0].lower())
if best_match["ratio"] is None or ratio > best_match["ratio"]:
best_match = {"ratio":ratio,"title":image_path[0],"path":image_path[1]}
except StopIteration:
break

if best_match["ratio"] > MEDIA_IMAGE_MIN_SCORE_REQUIRED/100:
_LOGGER.debug("Match found for %s: %s (%f) %s", media_title, best_match["title"], best_match["ratio"], self._media_image_base_url+best_match["path"])
self._media_image_url = self._media_image_base_url+best_match["path"]
else:
_LOGGER.debug("No match found for %s: best candidate was %s (%f) %s", media_title, best_match["title"], best_match["ratio"], self._media_image_base_url+best_match["path"])
self._media_image_url = None
except:
self._media_image_url = None

best_match = {"ratio": None, "title": None, "path": None}
while True:
try:
image_path = next(image_paths)
ratio = levenshtein_ratio_and_distance(media_title,image_path[0].lower(),ratio_calc = True)
if best_match["ratio"] is None or ratio > best_match["ratio"]:
best_match = {"ratio":ratio,"title":image_path[0],"path":image_path[1]}
except StopIteration:
break

if best_match["ratio"] > MEDIA_IMAGE_MIN_SCORE_REQUIRED/100:
_LOGGER.debug("Match found for %s: %s (%f) %s", media_title, best_match["title"], best_match["ratio"], self._media_image_base_url+best_match["path"])
self._media_image_url = self._media_image_base_url+best_match["path"]
else:
_LOGGER.debug("No match found for %s: best candidate was %s (%f) %s", media_title, best_match["title"], best_match["ratio"], self._media_image_base_url+best_match["path"])
self._media_image_url = None
_LOGGER.warning("Not able to process logos. Logo paths file might be missing.")
else:
_LOGGER.debug("No media title right now!")
self._media_image_url = None


@property
def state(self):
"""Return the state of the device."""
Expand Down Expand Up @@ -937,47 +975,26 @@ async def async_select_source(self, source):
self._last_source_time = datetime.now()
self._source = source

def levenshtein_ratio_and_distance(s, t, ratio_calc = False):
""" levenshtein_ratio_and_distance:
Calculates levenshtein distance between two strings.
If ratio_calc = True, the function computes the
levenshtein distance ratio of similarity between two strings
For all i and j, distance[i,j] will contain the Levenshtein
distance between the first i characters of s and the
first j characters of t
"""
# Initialize matrix of zeros
rows = len(s)+1
cols = len(t)+1
distance = np.zeros((rows,cols),dtype = int)

# Populate matrix of zeros with the indeces of each character of both strings
for i in range(1, rows):
for k in range(1,cols):
distance[i][0] = i
distance[0][k] = k

# Iterate over the matrix to compute the cost of deletions,insertions and/or substitutions
for col in range(1, cols):
for row in range(1, rows):
if s[row-1] == t[col-1]:
cost = 0 # If the characters are the same in the two strings in a given position [i,j] then the cost is 0
else:
# In order to align the results with those of the Python Levenshtein package, if we choose to calculate the ratio
# the cost of a substitution is 2. If we calculate just distance, then the cost of a substitution is 1.
if ratio_calc == True:
cost = 2
else:
cost = 1
distance[row][col] = min(distance[row-1][col] + 1, # Cost of deletions
distance[row][col-1] + 1, # Cost of insertions
distance[row-1][col-1] + cost) # Cost of substitutions
if ratio_calc == True:
# Computation of the Levenshtein Distance Ratio
Ratio = ((len(s)+len(t)) - distance[row][col]) / (len(s)+len(t))
return Ratio
else:
# print(distance) # Uncomment if you want to see the matrix showing how the algorithm computes the cost of deletions,
# insertions and/or substitutions
# This is the minimum number of edits needed to convert string a to string b
return "The strings are {} edits away".format(distance[row][col])
@staticmethod
def _levenshtein_ratio(s, t):
rows = len(s)+1
cols = len(t)+1
distance = np.zeros((rows,cols),dtype = int)

for i in range(1, rows):
for k in range(1,cols):
distance[i][0] = i
distance[0][k] = k

for col in range(1, cols):
for row in range(1, rows):
if s[row-1] == t[col-1]:
cost = 0
else:
cost = 2
distance[row][col] = min(distance[row-1][col] + 1,
distance[row][col-1] + 1,
distance[row-1][col-1] + cost)

ratio = ((len(s)+len(t)) - distance[row][col]) / (len(s)+len(t))
return ratio

0 comments on commit 1d2a2f0

Please sign in to comment.