From 5080caa38e61ee9c1b70f4c8c40944f55af90d68 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 18 Aug 2024 12:56:47 +0200 Subject: [PATCH] [#422] Enabled support for yt-dlp mux+transcoding in media plugins Reviewed-on: https://git.platypush.tech/platypush/platypush/pulls/423 --- .../platypush/plugins/media.omxplayer.rst | 6 - docs/source/plugins.rst | 1 - .../app/streaming/plugins/media/_registry.py | 13 +- .../backend/http/media/handlers/__init__.py | 4 - platypush/backend/http/media/handlers/file.py | 3 + platypush/bus/redis.py | 1 + platypush/common/gstreamer/__init__.py | 49 +- platypush/message/event/__init__.py | 46 +- platypush/plugins/media/__init__.py | 595 +++++------------- platypush/plugins/media/_constants.py | 89 +++ platypush/plugins/media/_download.py | 354 ----------- platypush/plugins/media/_model.py | 29 + platypush/plugins/media/_resource.py | 21 - platypush/plugins/media/_resource/__init__.py | 12 + platypush/plugins/media/_resource/_base.py | 134 ++++ .../media/_resource/downloaders/__init__.py | 16 + .../media/_resource/downloaders/_base.py | 208 ++++++ .../media/_resource/downloaders/http.py | 73 +++ .../media/_resource/downloaders/youtube.py | 232 +++++++ platypush/plugins/media/_resource/file.py | 23 + platypush/plugins/media/_resource/http.py | 19 + .../media/_resource/parsers/__init__.py | 26 + .../plugins/media/_resource/parsers/_base.py | 28 + .../plugins/media/_resource/parsers/file.py | 37 ++ .../plugins/media/_resource/parsers/http.py | 31 + .../media/_resource/parsers/torrent.py | 62 ++ .../media/_resource/parsers/youtube.py | 119 ++++ platypush/plugins/media/_resource/youtube.py | 145 +++++ platypush/plugins/media/_search/__init__.py | 59 ++ .../media/{search => _search}/jellyfin.py | 17 +- .../{search => _search}/local/__init__.py | 7 +- .../media/{search => _search}/local/db.py | 0 .../{search => _search}/local/metadata.py | 4 +- platypush/plugins/media/_search/plex.py | 91 +++ .../media/{search => _search}/torrent.py | 17 +- .../media/{search => _search}/youtube.py | 5 +- platypush/plugins/media/_state.py | 12 - .../plugins/media/chromecast/__init__.py | 101 ++- .../plugins/media/chromecast/_listener.py | 10 +- platypush/plugins/media/gstreamer/__init__.py | 149 +++-- platypush/plugins/media/gstreamer/model.py | 72 ++- platypush/plugins/media/kodi/__init__.py | 148 +++-- platypush/plugins/media/mplayer/__init__.py | 28 +- platypush/plugins/media/mpv/__init__.py | 70 ++- platypush/plugins/media/omxplayer/__init__.py | 446 ------------- .../plugins/media/omxplayer/manifest.json | 17 - platypush/plugins/media/search/__init__.py | 33 - platypush/plugins/media/search/plex.py | 61 -- platypush/plugins/media/vlc/__init__.py | 177 ++++-- 49 files changed, 2218 insertions(+), 1682 deletions(-) delete mode 100644 docs/source/platypush/plugins/media.omxplayer.rst create mode 100644 platypush/plugins/media/_constants.py delete mode 100644 platypush/plugins/media/_download.py create mode 100644 platypush/plugins/media/_model.py delete mode 100644 platypush/plugins/media/_resource.py create mode 100644 platypush/plugins/media/_resource/__init__.py create mode 100644 platypush/plugins/media/_resource/_base.py create mode 100644 platypush/plugins/media/_resource/downloaders/__init__.py create mode 100644 platypush/plugins/media/_resource/downloaders/_base.py create mode 100644 platypush/plugins/media/_resource/downloaders/http.py create mode 100644 platypush/plugins/media/_resource/downloaders/youtube.py create mode 100644 platypush/plugins/media/_resource/file.py create mode 100644 platypush/plugins/media/_resource/http.py create mode 100644 platypush/plugins/media/_resource/parsers/__init__.py create mode 100644 platypush/plugins/media/_resource/parsers/_base.py create mode 100644 platypush/plugins/media/_resource/parsers/file.py create mode 100644 platypush/plugins/media/_resource/parsers/http.py create mode 100644 platypush/plugins/media/_resource/parsers/torrent.py create mode 100644 platypush/plugins/media/_resource/parsers/youtube.py create mode 100644 platypush/plugins/media/_resource/youtube.py create mode 100644 platypush/plugins/media/_search/__init__.py rename platypush/plugins/media/{search => _search}/jellyfin.py (56%) rename platypush/plugins/media/{search => _search}/local/__init__.py (98%) rename platypush/plugins/media/{search => _search}/local/db.py (100%) rename platypush/plugins/media/{search => _search}/local/metadata.py (95%) create mode 100644 platypush/plugins/media/_search/plex.py rename platypush/plugins/media/{search => _search}/torrent.py (56%) rename platypush/plugins/media/{search => _search}/youtube.py (79%) delete mode 100644 platypush/plugins/media/_state.py delete mode 100644 platypush/plugins/media/omxplayer/__init__.py delete mode 100644 platypush/plugins/media/omxplayer/manifest.json delete mode 100644 platypush/plugins/media/search/__init__.py delete mode 100644 platypush/plugins/media/search/plex.py diff --git a/docs/source/platypush/plugins/media.omxplayer.rst b/docs/source/platypush/plugins/media.omxplayer.rst deleted file mode 100644 index c5530bf01..000000000 --- a/docs/source/platypush/plugins/media.omxplayer.rst +++ /dev/null @@ -1,6 +0,0 @@ -``media.omxplayer`` -===================================== - -.. automodule:: platypush.plugins.media.omxplayer - :members: - diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 5ee580201..b418ebc27 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -77,7 +77,6 @@ Plugins platypush/plugins/media.kodi.rst platypush/plugins/media.mplayer.rst platypush/plugins/media.mpv.rst - platypush/plugins/media.omxplayer.rst platypush/plugins/media.plex.rst platypush/plugins/media.subtitles.rst platypush/plugins/media.vlc.rst diff --git a/platypush/backend/http/app/streaming/plugins/media/_registry.py b/platypush/backend/http/app/streaming/plugins/media/_registry.py index 55202e72f..8a9e62690 100644 --- a/platypush/backend/http/app/streaming/plugins/media/_registry.py +++ b/platypush/backend/http/app/streaming/plugins/media/_registry.py @@ -25,10 +25,15 @@ def load_media_map() -> MediaMap: logger().warning('Could not load media map: %s', e) return {} - return { - media_id: MediaHandler.build(**media_info) - for media_id, media_info in media_map.items() - } + parsed_map = {} + for media_id, media_info in media_map.items(): + try: + parsed_map[media_id] = MediaHandler.build(**media_info) + except Exception as e: + logger().debug('Could not load media %s: %s', media_id, e) + continue + + return parsed_map def save_media_map(new_map: MediaMap): diff --git a/platypush/backend/http/media/handlers/__init__.py b/platypush/backend/http/media/handlers/__init__.py index 921faeaed..99948c5cd 100644 --- a/platypush/backend/http/media/handlers/__init__.py +++ b/platypush/backend/http/media/handlers/__init__.py @@ -1,7 +1,6 @@ from abc import ABC, abstractmethod import hashlib import logging -import os from typing import Generator, Optional from platypush.message import JSONAble @@ -57,9 +56,6 @@ def build(cls, source: str, *args, **kwargs) -> 'MediaHandler': logging.exception(e) errors[hndl_class.__name__] = str(e) - if os.path.exists(source): - source = f'file://{source}' - raise AttributeError( f'The source {source} has no handlers associated. Errors: {errors}' ) diff --git a/platypush/backend/http/media/handlers/file.py b/platypush/backend/http/media/handlers/file.py index e299c9359..fe73719b8 100644 --- a/platypush/backend/http/media/handlers/file.py +++ b/platypush/backend/http/media/handlers/file.py @@ -15,6 +15,9 @@ class FileHandler(MediaHandler): prefix_handlers = ['file://'] def __init__(self, source, *args, **kwargs): + if isinstance(source, str) and os.path.exists(source): + source = f'file://{source}' + super().__init__(source, *args, **kwargs) self.path = os.path.abspath( diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index e5969452f..9c707dc31 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -70,6 +70,7 @@ def poll(self): try: data = msg.get('data', b'').decode('utf-8') + logger.debug('Received message on the Redis bus: %r', data) parsed_msg = Message.build(data) if parsed_msg and self.on_message: self.on_message(parsed_msg) diff --git a/platypush/common/gstreamer/__init__.py b/platypush/common/gstreamer/__init__.py index bb4e53129..33a542998 100644 --- a/platypush/common/gstreamer/__init__.py +++ b/platypush/common/gstreamer/__init__.py @@ -29,10 +29,10 @@ def __init__(self): self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() - self.bus.connect('message::eos', self.on_eos) - self.bus.connect('message::error', self.on_error) + self.bus.connect('message', self.on_message) self.data_ready = threading.Event() self.data = None + self._gst_state = Gst.State.NULL def add(self, element_name: str, *args, **props): el = Gst.ElementFactory.make(element_name, *args) @@ -94,6 +94,28 @@ def set_volume(self, volume: float): assert self.source, 'No source initialized' self.source.set_property('volume', volume) + def _msg_handler(self, message) -> bool: + from gi.repository import Gst # type: ignore[attr-defined] + + if message.type == Gst.MessageType.EOS: + self.on_eos() + return True + + if message.type == Gst.MessageType.ERROR: + err, debug = message.parse_error() + self.on_error(err, debug) + return True + + if message.type == Gst.MessageType.STATE_CHANGED: + old_state, new_state, _ = message.parse_state_changed()[:3] + self.on_state_changed(old_state, new_state) + return True + + return False # The message was not handled + + def on_message(self, _, message, *__): + self._msg_handler(message) + def on_buffer(self, sink): sample = GstApp.AppSink.pull_sample(sink) buffer = sample.get_buffer() @@ -106,6 +128,29 @@ def on_eos(self, *_, **__): self.logger.info('End of stream event received') self.stop() + def on_state_changed(self, old_state, new_state): + from gi.repository import Gst # type: ignore[attr-defined] + + if ( + old_state == new_state + or new_state == self._gst_state + or old_state != self._gst_state + ): + return + + self._gst_state = new_state + + if new_state == Gst.State.PLAYING: + self.on_play() + elif new_state == Gst.State.PAUSED: + self.on_pause() + + def on_play(self): + self.logger.debug('GStreamer playback started') + + def on_pause(self): + self.logger.debug('GStreamer playback paused') + def on_error(self, _, msg): self.logger.warning('GStreamer pipeline error: %s', msg.parse_error()) self.stop() diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index 465b981ad..673435131 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -1,4 +1,3 @@ -import copy import json import logging import random @@ -7,11 +6,11 @@ from dataclasses import dataclass, field from datetime import date -from typing import Any +from typing import Any, Optional, Union from platypush.config import Config from platypush.message import Message -from platypush.utils import get_event_class_by_type +from platypush.utils import get_event_class_by_type, get_plugin_name_by_class logger = logging.getLogger('platypush') @@ -261,7 +260,7 @@ def as_dict(self): """ Converts the event into a dictionary """ - args = copy.deepcopy(self.args) + args = dict(deepcopy(self.args)) flatten(args) return { 'type': 'event', @@ -277,7 +276,7 @@ def __str__(self): Overrides the str() operator and converts the message into a UTF-8 JSON string """ - args = copy.deepcopy(self.args) + args = deepcopy(self.args) flatten(args) return json.dumps(self.as_dict(), cls=self.Encoder) @@ -297,6 +296,43 @@ class EventMatchResult: parsed_args: dict = field(default_factory=dict) +def deepcopy( + args: Union[dict, list], _out: Optional[Union[dict, list]] = None +) -> Union[dict, list]: + """ + Workaround implementation of deepcopy that doesn't raise exceptions + on non-pickeable objects. + """ + from platypush.plugins import Plugin + + if _out is None: + _out = {} if isinstance(args, dict) else [] + + if isinstance(args, list): + _out = [None] * len(args) + for i, v in enumerate(args): + if isinstance(v, dict): + _out[i] = deepcopy(v) + elif isinstance(v, (list, tuple, set)): + _out[i] = deepcopy(list(v)) + elif isinstance(v, Plugin): + _out[i] = get_plugin_name_by_class(v.__class__) + else: + _out[i] = v + elif isinstance(args, dict): + for k, v in args.items(): + if isinstance(v, dict): + _out[k] = deepcopy(v) + elif isinstance(v, (list, tuple, set)): + _out[k] = deepcopy(list(v)) + elif isinstance(v, Plugin): + _out[k] = get_plugin_name_by_class(v.__class__) + else: + _out[k] = v + + return _out + + def flatten(args): """ Flatten a nested dictionary for string serialization. diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index 7a96aa7f4..b6b1677cb 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -1,10 +1,6 @@ -import functools -import inspect -import json import os +import pathlib import queue -import re -import subprocess import tempfile import threading from abc import ABC, abstractmethod @@ -13,6 +9,7 @@ Iterable, List, Optional, + Sequence, Tuple, Type, Union, @@ -24,16 +21,18 @@ from platypush.context import get_plugin, get_backend from platypush.message.event.media import MediaEvent from platypush.plugins import RunnablePlugin, action -from platypush.utils import get_default_downloads_dir, get_plugin_name_by_class - -from ._download import ( - DownloadState, - DownloadThread, - FileDownloadThread, - YouTubeDownloadThread, +from platypush.utils import ( + get_default_downloads_dir, + get_mime_type, + get_plugin_name_by_class, ) + +from ._constants import audio_extensions, video_extensions +from ._model import DownloadState, PlayerState from ._resource import MediaResource -from ._state import PlayerState +from ._resource.downloaders import DownloadThread, MediaResourceDownloader, downloaders +from ._resource.parsers import MediaResourceParser, parsers +from ._search import MediaSearcher, searchers class MediaPlugin(RunnablePlugin, ABC): @@ -51,97 +50,14 @@ class MediaPlugin(RunnablePlugin, ABC): 'This method must be implemented in a derived class' ) - # Supported audio extensions - audio_extensions = { - '3gp', - 'aa', - 'aac', - 'aax', - 'act', - 'aiff', - 'amr', - 'ape', - 'au', - 'awb', - 'dct', - 'dss', - 'dvf', - 'flac', - 'gsm', - 'iklax', - 'ivs', - 'm4a', - 'm4b', - 'm4p', - 'mmf', - 'mp3', - 'mpc', - 'msv', - 'nmf', - 'nsf', - 'ogg,', - 'opus', - 'ra,', - 'raw', - 'sln', - 'tta', - 'vox', - 'wav', - 'wma', - 'wv', - 'webm', - '8svx', - } - - # Supported video extensions - video_extensions = { - 'webm', - 'mkv', - 'flv', - 'vob', - 'ogv', - 'ogg', - 'drc', - 'gif', - 'gifv', - 'mng', - 'avi', - 'mts', - 'm2ts', - 'mov', - 'qt', - 'wmv', - 'yuv', - 'rm', - 'rmvb', - 'asf', - 'amv', - 'mp4', - 'm4p', - 'm4v', - 'mpg', - 'mp2', - 'mpeg', - 'mpe', - 'mpv', - 'm2v', - 'svi', - '3gp', - '3g2', - 'mxf', - 'roq', - 'nsv', - 'f4v', - 'f4p', - 'f4a', - 'f4b', - } + audio_extensions = audio_extensions + video_extensions = video_extensions supported_media_plugins = [ + 'media.vlc', + 'media.mpv', 'media.mplayer', 'media.omxplayer', - 'media.mpv', - 'media.vlc', 'media.chromecast', 'media.gstreamer', ] @@ -156,9 +72,13 @@ def __init__( env: Optional[Dict[str, str]] = None, volume: Optional[Union[float, int]] = None, torrent_plugin: str = 'torrent', - youtube_format: Optional[str] = None, + youtube_format: Optional[str] = 'bv[height<=?1080]+ba/bv+ba', + youtube_audio_format: Optional[str] = 'ba', youtube_dl: str = 'yt-dlp', merge_output_format: str = 'mp4', + cache_dir: Optional[str] = None, + cache_streams: bool = False, + ytdl_args: Optional[Sequence[str]] = None, **kwargs, ): """ @@ -187,6 +107,9 @@ def __init__( ``bestvideo[height<=?1080][ext=mp4]+bestaudio`` - select the best mp4 video with a resolution <= 1080p, and the best audio format. + :param youtube_audio_format: Select the preferred audio format for + YouTube videos downloaded only for audio. Default: ``bestaudio``. + :param youtube_dl: Path to the ``youtube-dl`` executable, used to extract information from YouTube videos and other media platforms. Default: ``yt-dlp``. The default has changed from ``youtube-dl`` to @@ -197,12 +120,29 @@ def __init__( and the upstream media contains both audio and video to be merged, this can be used to specify the format of the output container - e.g. ``mp4``, ``mkv``, ``avi``, ``flv``. Default: ``mp4``. + + :param cache_dir: Directory where the media cache will be stored. If not + specified, the cache will be stored in the default cache directory + (usually ``~/.cache/platypush/media/``). + + :param cache_streams: If set to True, streams transcoded via yt-dlp or + ffmpeg will be cached in ``cache_dir`` directory. If not set + (default), then streams will be played directly via memory pipe. + You may want to set this to True if you have a slow network, or if + you want to play media at high quality, even though the start time + may be delayed. If set to False, the media will start playing as + soon as the stream is ready, but the quality may be lower, + especially at the beginning, and seeking may not be supported. + + :param ytdl_args: Additional arguments to pass to the youtube-dl + executable. Default: None. """ super().__init__(**kwargs) if media_dirs is None: media_dirs = [] + player = None player_config = {} self._download_threads: Dict[Tuple[str, str], DownloadThread] = {} @@ -230,6 +170,7 @@ def __init__( self.registered_actions.add(act) self._env = env or {} + self.cache_streams = cache_streams self.media_dirs = set( filter( os.path.isdir, @@ -245,7 +186,18 @@ def __init__( ) ) - os.makedirs(self.download_dir, exist_ok=True) + self.cache_dir = os.path.abspath( + os.path.expanduser(cache_dir) + if cache_dir + else os.path.join( + Config.get_cachedir(), + 'media', + get_plugin_name_by_class(self.__class__), + ) + ) + + pathlib.Path(self.cache_dir).mkdir(parents=True, exist_ok=True) + pathlib.Path(self.download_dir).mkdir(parents=True, exist_ok=True) self._ytdl = youtube_dl self.media_dirs.add(self.download_dir) self.volume = volume @@ -253,62 +205,31 @@ def __init__( self._youtube_proc = None self.torrent_plugin = torrent_plugin self.youtube_format = youtube_format + self.youtube_audio_format = youtube_audio_format self.merge_output_format = merge_output_format + self.ytdl_args = ytdl_args or [] self._latest_resource: Optional[MediaResource] = None - @staticmethod - def _torrent_event_handler(evt_queue): - def handler(event): - # More than 5% of the torrent has been downloaded - if event.args.get('progress', 0) > 5 and event.args.get('files'): - evt_queue.put(event.args['files']) - - return handler - - def get_extractors(self): - try: - from yt_dlp.extractor import _extractors # type: ignore - except ImportError: - self.logger.debug('yt_dlp not installed') - return - - for _, obj_type in inspect.getmembers(_extractors): - if ( - inspect.isclass(obj_type) - and isinstance(getattr(obj_type, "_VALID_URL", None), str) - and obj_type.__name__ != "GenericIE" - ): - yield obj_type - - def _is_youtube_resource(self, resource: str): - return any( - re.search(getattr(extractor, '_VALID_URL', '^$'), resource) - for extractor in self.get_extractors() - ) - - def _get_youtube_best_thumbnail(self, info: Dict[str, dict]): - thumbnails = info.get('thumbnails', {}) - if not thumbnails: - return None - - # Preferred resolution - for res in ((640, 480), (480, 360), (320, 240)): - thumb = next( - ( - thumb - for thumb in thumbnails - if thumb.get('width') == res[0] and thumb.get('height') == res[1] - ), - None, - ) + self._parsers: Dict[Type[MediaResourceParser], MediaResourceParser] = { + parser: parser(self) for parser in parsers + } - if thumb: - return thumb.get('url') + self._downloaders: Dict[ + Type[MediaResourceDownloader], MediaResourceDownloader + ] = {downloader: downloader(self) for downloader in downloaders} - # Default fallback (best quality) - return info.get('thumbnail') + self._searchers: Dict[Type[MediaSearcher], MediaSearcher] = { + searcher: searcher(dirs=self.media_dirs, media_plugin=self) + for searcher in searchers + } - def _get_resource(self, resource: str): + def _get_resource( + self, + resource: str, + metadata: Optional[dict] = None, + only_audio: bool = False, + **_, + ): """ :param resource: Resource to play/parse. Supported types: @@ -319,72 +240,19 @@ def _get_resource(self, resource: str): """ - if resource.startswith('file://'): - path = resource[len('file://') :] - assert os.path.isfile(path), f'File {path} not found' - self._latest_resource = MediaResource( - resource=resource, - url=resource, - title=os.path.basename(resource), - filename=os.path.basename(resource), - ) - elif self._is_youtube_resource(resource): - info = self._get_youtube_info(resource) - if info: - url = info.get('url') - if url: - resource = url - self._latest_resource = MediaResource( - resource=resource, - url=resource, - title=info.get('title'), - description=info.get('description'), - filename=info.get('filename'), - image=info.get('thumbnail'), - duration=float(info.get('duration') or 0) or None, - channel=info.get('channel'), - channel_url=info.get('channel_url'), - resolution=info.get('resolution'), - type=info.get('extractor'), - ) - elif resource.startswith('magnet:?'): - self.logger.info( - 'Downloading torrent %s to %s', resource, self.download_dir - ) - torrents = get_plugin(self.torrent_plugin) - assert torrents, f'{self.torrent_plugin} plugin not configured' - - evt_queue = queue.Queue() - torrents.download( - resource, - download_dir=self.download_dir, - _async=True, - is_media=True, - event_hndl=self._torrent_event_handler(evt_queue), - ) - - resources = [f for f in evt_queue.get()] # noqa: C416,R1721 - - if resources: - self._videos_queue = sorted(resources) - resource = self._videos_queue.pop(0) - else: - raise RuntimeError(f'No media file found in torrent {resource}') + for parser in self._parsers.values(): + media_resource = parser.parse(resource, only_audio=only_audio) + if media_resource: + for k, v in (metadata or {}).items(): + setattr(media_resource, k, v) - assert resource, 'Unable to find any compatible media resource' - return resource + return media_resource - def _stop_torrent(self): - try: - torrents = get_plugin(self.torrent_plugin) - assert torrents, f'{self.torrent_plugin} plugin not configured' - torrents.quit() - except Exception as e: - self.logger.warning('Could not stop torrent plugin: %s', e) + raise AssertionError(f'Unknown media resource: {resource}') @action @abstractmethod - def play(self, resource, **kwargs): + def play(self, resource: str, **kwargs): raise self._NOT_IMPLEMENTED_ERR @action @@ -567,43 +435,45 @@ def thread(): return thread def _get_search_handler_by_type(self, search_type: str): - if search_type == 'file': - from .search import LocalMediaSearcher - - return LocalMediaSearcher(self.media_dirs, media_plugin=self) - if search_type == 'torrent': - from .search import TorrentMediaSearcher - - return TorrentMediaSearcher(media_plugin=self) - if search_type == 'youtube': - from .search import YoutubeMediaSearcher - - return YoutubeMediaSearcher(media_plugin=self) - if search_type == 'plex': - from .search import PlexMediaSearcher - - return PlexMediaSearcher(media_plugin=self) - if search_type == 'jellyfin': - from .search import JellyfinMediaSearcher + searcher = next( + iter(filter(lambda s: s.supports(search_type), self._searchers.values())), + None, + ) - return JellyfinMediaSearcher(media_plugin=self) + if not searcher: + self.logger.warning('Unsupported search type: %s', search_type) + return None - self.logger.warning('Unsupported search type: %s', search_type) - return None + return searcher @classmethod def is_video_file(cls, filename: str): - return filename.lower().split('.')[-1] in cls.video_extensions + if filename.lower().split('.')[-1] in cls.video_extensions: + return True + + mime_type = get_mime_type(filename) + return bool(mime_type and mime_type.startswith('video/')) @classmethod def is_audio_file(cls, filename: str): - return filename.lower().split('.')[-1] in cls.audio_extensions + if filename.lower().split('.')[-1] in cls.audio_extensions: + return True - def _get_info(self, resource: str): - if self._is_youtube_resource(resource): - return self.get_youtube_info(resource) + mime_type = get_mime_type(filename) + return bool(mime_type and mime_type.startswith('audio/')) - return {'url': resource} + @classmethod + def is_media_file(cls, file: str) -> bool: + if file.split('.')[-1].lower() in cls.video_extensions.union( + cls.audio_extensions + ): + return True + + mime_type = get_mime_type(file) + return bool( + mime_type + and (mime_type.startswith('video/') or mime_type.startswith('audio/')) + ) @action def start_streaming( @@ -660,109 +530,14 @@ def stop_streaming(self, media_id: str): assert response.ok, response.text or response.reason return response.json() - def _get_youtube_info(self, url, youtube_format: Optional[str] = None): - ytdl_cmd = [ - self._ytdl, - *( - ['-f', youtube_format or self.youtube_format] - if youtube_format or self.youtube_format - else [] - ), - '-j', - '-g', - url, - ] - - self.logger.info('Executing command %s', ' '.join(ytdl_cmd)) - with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as ytdl: - output = ytdl.communicate()[0].decode().strip() - ytdl.wait() - - self.logger.debug('yt-dlp output: %s', output) - lines = output.split('\n') - - if not lines: - self.logger.warning('No output from yt-dlp') - return None - - stream_url = lines[1] if len(lines) > 2 else lines[0] - info = lines[-1] - return { - **json.loads(info), - 'url': stream_url, - } - - @staticmethod - def get_youtube_id(url: str) -> Optional[str]: - patterns = [ - re.compile(pattern) - for pattern in [ - r'https?://www.youtube.com/watch\?v=([^&#]+)', - r'https?://youtube.com/watch\?v=([^&#]+)', - r'https?://youtu.be/([^&#/]+)', - r'youtube:video:([^&#:])', - ] - ] - - for pattern in patterns: - m = pattern.search(url) - if m: - return m.group(1) - - return None - - @action - def get_youtube_info(self, url): - # Legacy conversion for Mopidy YouTube URIs - m = re.match('youtube:video:(.*)', url) - if m: - url = f'https://www.youtube.com/watch?v={m.group(1)}' - - with subprocess.Popen([self._ytdl, '-j', url], stdout=subprocess.PIPE) as proc: - if proc.stdout is None: - return None - - return proc.stdout.read().decode("utf-8", "strict")[:-1] - @action def get_info(self, resource: str): - return self._get_info(resource) - - @action - def get_media_file_duration(self, filename): - """ - Get the duration of a media file in seconds. Requires ffmpeg - """ + for parser in self._parsers.values(): + info = parser.parse(resource) + if info: + return info.to_dict() - if filename.startswith('file://'): - filename = filename[7:] - - with subprocess.Popen( - ["ffprobe", filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT - ) as result: - if not result.stdout: - return 0 - - return functools.reduce( - lambda t, t_i: t + t_i, - [ - float(t) * pow(60, i) - for (i, t) in enumerate( - re.search( - r'^Duration:\s*([^,]+)', - [ - x.decode() - for x in result.stdout.readlines() - if "Duration" in x.decode() - ] - .pop() - .strip(), - ) - .group(1) # type: ignore - .split(':')[::-1] - ) - ], - ) + return {'url': resource} @action def download( @@ -774,6 +549,7 @@ def download( sync: bool = False, only_audio: bool = False, youtube_format: Optional[str] = None, + youtube_audio_format: Optional[str] = None, merge_output_format: Optional[str] = None, ): """ @@ -802,25 +578,46 @@ def download( :param only_audio: If set to True, only the audio track will be downloaded (only supported for yt-dlp-compatible URLs for now). :param youtube_format: Override the default ``youtube_format`` setting. + :param youtube_audio_format: Override the default ``youtube_audio_format`` :param merge_output_format: Override the default ``merge_output_format`` setting. :return: The absolute path to the downloaded file. """ - path = self._get_download_path( - url, directory=directory, filename=filename, youtube_format=youtube_format - ) + dl_thread = None + resource = self._get_resource(url, only_audio=only_audio) - if self._is_youtube_resource(url): - dl_thread = self._download_youtube_url( - url, path, youtube_format=youtube_format, only_audio=only_audio - ) + if filename: + path = os.path.expanduser(filename) + elif resource.filename: + path = resource.filename else: - if only_audio: - self.logger.warning( - 'Only audio download is not supported for non-YouTube URLs' + path = os.path.basename(resource.url) + + if not os.path.isabs(path): + directory = os.path.expanduser(directory or self.download_dir) + path = os.path.join(directory, path) + + for downloader in self._downloaders.values(): + if downloader.supports(resource): + if only_audio and not downloader.supports_only_audio(): + self.logger.warning( + 'Only audio download is not supported for this resource' + ) + + dl_thread = downloader.download( + resource=resource, + path=path, + timeout=timeout, + only_audio=only_audio, + youtube_format=youtube_format or self.youtube_format, + youtube_audio_format=youtube_audio_format + or self.youtube_audio_format, + merge_output_format=merge_output_format, ) - dl_thread = self._download_url(url, path, timeout=timeout) + break + + assert dl_thread, f'No downloader found for resource {url}' if sync: dl_thread.join() @@ -934,90 +731,10 @@ def _get_downloads(self, url: Optional[str] = None, path: Optional[str] = None): assert threads, f'No matching downloads found for [url={url}, path={path}]' return threads - def _get_download_path( - self, - url: str, - directory: Optional[str] = None, - filename: Optional[str] = None, - youtube_format: Optional[str] = None, - ) -> str: - if not directory: - directory = self.download_dir - - directory = os.path.expanduser(directory) - youtube_format = youtube_format or self.youtube_format - - if self._is_youtube_resource(url): - with subprocess.Popen( - [ - self._ytdl, - *( - [ - '-f', - youtube_format, - ] - if youtube_format - else [] - ), - *( - ['--merge-output-format', self.merge_output_format] - if self.merge_output_format - else [] - ), - '-O', - '%(title)s.%(ext)s', - url, - ], - stdout=subprocess.PIPE, - ) as proc: - assert proc.stdout, 'yt-dlp stdout is None' - filename = proc.stdout.read().decode()[:-1] - - if not filename: - filename = url.split('/')[-1] - - return os.path.join(directory, filename) - - def _download_url(self, url: str, path: str, timeout: int) -> FileDownloadThread: - download_thread = FileDownloadThread( - url=url, - path=path, - timeout=timeout, - on_start=self._on_download_start, - post_event=self._post_event, - stop_event=self._should_stop, - ) - - self._start_download(download_thread) - return download_thread - - def _download_youtube_url( - self, - url: str, - path: str, - youtube_format: Optional[str] = None, - merge_output_format: Optional[str] = None, - only_audio: bool = False, - ) -> YouTubeDownloadThread: - download_thread = YouTubeDownloadThread( - url=url, - path=path, - ytdl=self._ytdl, - only_audio=only_audio, - youtube_format=youtube_format or self.youtube_format, - merge_output_format=merge_output_format or self.merge_output_format, - on_start=self._on_download_start, - post_event=self._post_event, - stop_event=self._should_stop, - ) - - self._start_download(download_thread) - return download_thread - - def _on_download_start(self, thread: DownloadThread): + def on_download_start(self, thread: DownloadThread): self._download_threads[thread.url, thread.path] = thread - def _start_download(self, thread: DownloadThread): + def start_download(self, thread: DownloadThread): if (thread.url, thread.path) in self._download_threads: self.logger.warning( 'A download of %s to %s is already in progress', thread.url, thread.path @@ -1026,7 +743,7 @@ def _start_download(self, thread: DownloadThread): thread.start() - def _post_event(self, event_type: Type[MediaEvent], **kwargs): + def post_event(self, event_type: Type[MediaEvent], **kwargs): evt = event_type( player=get_plugin_name_by_class(self.__class__), plugin=self, **kwargs ) @@ -1054,6 +771,14 @@ def get_subtitles_file(subtitles: Optional[str] = None): f.write(content) return f.name + @property + def supports_local_media(self) -> bool: + return True + + @property + def supports_local_pipe(self) -> bool: + return True + def main(self): self.wait_stop() @@ -1061,6 +786,8 @@ def main(self): __all__ = [ 'DownloadState', 'MediaPlugin', + 'MediaResource', + 'MediaSearcher', 'PlayerState', ] diff --git a/platypush/plugins/media/_constants.py b/platypush/plugins/media/_constants.py new file mode 100644 index 000000000..4f4f241b8 --- /dev/null +++ b/platypush/plugins/media/_constants.py @@ -0,0 +1,89 @@ +# Supported audio extensions +audio_extensions = { + '3gp', + 'aa', + 'aac', + 'aax', + 'act', + 'aiff', + 'amr', + 'ape', + 'au', + 'awb', + 'dct', + 'dss', + 'dvf', + 'flac', + 'gsm', + 'iklax', + 'ivs', + 'm4a', + 'm4b', + 'm4p', + 'mmf', + 'mp3', + 'mpc', + 'msv', + 'nmf', + 'nsf', + 'ogg,', + 'opus', + 'ra,', + 'raw', + 'sln', + 'tta', + 'vox', + 'wav', + 'wma', + 'wv', + 'webm', + '8svx', +} + + +# Supported video extensions +video_extensions = { + 'webm', + 'mkv', + 'flv', + 'vob', + 'ogv', + 'ogg', + 'drc', + 'gif', + 'gifv', + 'mng', + 'avi', + 'mts', + 'm2ts', + 'mov', + 'qt', + 'wmv', + 'yuv', + 'rm', + 'rmvb', + 'asf', + 'amv', + 'mp4', + 'm4p', + 'm4v', + 'mpg', + 'mp2', + 'mpeg', + 'mpe', + 'mpv', + 'm2v', + 'svi', + '3gp', + '3g2', + 'mxf', + 'roq', + 'nsv', + 'f4v', + 'f4p', + 'f4a', + 'f4b', +} + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_download.py b/platypush/plugins/media/_download.py deleted file mode 100644 index 58b19cb74..000000000 --- a/platypush/plugins/media/_download.py +++ /dev/null @@ -1,354 +0,0 @@ -from abc import ABC, abstractmethod -from contextlib import suppress -from enum import Enum -import json -import logging -import signal -import subprocess -import threading -import time -from typing import Any, Callable, Optional, Type - -import requests - -from platypush.message.event.media import ( - MediaDownloadCancelledEvent, - MediaDownloadClearEvent, - MediaDownloadCompletedEvent, - MediaDownloadErrorEvent, - MediaDownloadEvent, - MediaDownloadPausedEvent, - MediaDownloadProgressEvent, - MediaDownloadResumedEvent, - MediaDownloadStartedEvent, -) - -from platypush.utils import wait_for_either - - -class DownloadState(Enum): - """ - Enum that represents the status of a download. - """ - - IDLE = 'idle' - STARTED = 'started' - DOWNLOADING = 'downloading' - PAUSED = 'paused' - COMPLETED = 'completed' - CANCELLED = 'cancelled' - ERROR = 'error' - - -class DownloadThread(threading.Thread, ABC): - """ - Thread that downloads a URL to a file. - """ - - _progress_update_interval = 1 - """ Throttle the progress updates to this interval, in seconds. """ - - def __init__( - self, - path: str, - url: str, - post_event: Callable, - size: Optional[int] = None, - timeout: Optional[int] = 10, - on_start: Callable[['DownloadThread'], None] = lambda _: None, - on_close: Callable[['DownloadThread'], None] = lambda _: None, - stop_event: Optional[threading.Event] = None, - ): - super().__init__(name=f'DownloadThread-{path}') - self.path = path - self.url = url - self.size = size - self.timeout = timeout - self.state = DownloadState.IDLE - self.progress = None - self.started_at = None - self.ended_at = None - self._upstream_stop_event = stop_event or threading.Event() - self._stop_event = threading.Event() - self._post_event = post_event - self._on_start = on_start - self._on_close = on_close - self._paused = threading.Event() - self._downloading = threading.Event() - self._last_progress_update_time = 0 - self.logger = logging.getLogger(__name__) - - def should_stop(self) -> bool: - return self._stop_event.is_set() or self._upstream_stop_event.is_set() - - @abstractmethod - def _run(self) -> bool: - pass - - def pause(self): - self.state = DownloadState.PAUSED - self._paused.set() - self._downloading.clear() - self.post_event(MediaDownloadPausedEvent) - - def resume(self): - self.state = DownloadState.DOWNLOADING - self._paused.clear() - self._downloading.set() - self.post_event(MediaDownloadResumedEvent) - - def run(self): - super().run() - interrupted = False - - try: - self.on_start() - interrupted = not self._run() - - if interrupted: - self.state = DownloadState.CANCELLED - else: - self.state = DownloadState.COMPLETED - except Exception as e: - self.state = DownloadState.ERROR - self.post_event(MediaDownloadErrorEvent, error=str(e)) - self.logger.warning('Error while downloading URL: %s', e) - finally: - self.on_close() - - def stop(self): - self.state = DownloadState.CANCELLED - self._stop_event.set() - self._downloading.clear() - - def on_start(self): - self.state = DownloadState.STARTED - self.started_at = time.time() - self.post_event(MediaDownloadStartedEvent) - self._on_start(self) - - def on_close(self): - self.ended_at = time.time() - if self.state == DownloadState.CANCELLED: - self.post_event(MediaDownloadCancelledEvent) - elif self.state == DownloadState.COMPLETED: - self.post_event(MediaDownloadCompletedEvent) - - self._on_close(self) - - def clear(self): - if self.state not in (DownloadState.COMPLETED, DownloadState.CANCELLED): - self.logger.info( - 'Download thread for %s is still active, stopping', self.url - ) - - self.stop() - self.join(timeout=10) - - self.post_event(MediaDownloadClearEvent) - - def post_event(self, event_type: Type[MediaDownloadEvent], **kwargs): - kwargs = { - 'resource': self.url, - 'path': self.path, - 'state': self.state.value, - 'size': self.size, - 'timeout': self.timeout, - 'progress': self.progress, - 'started_at': self.started_at, - 'ended_at': self.ended_at, - **kwargs, - } - - self._post_event(event_type, **kwargs) - - def __setattr__(self, name: str, value: Optional[Any], /) -> None: - if name == 'progress' and value is not None: - if value < 0 or value > 100: - self.logger.debug('Invalid progress value:%s', value) - return - - prev_progress = getattr(self, 'progress', None) - - if prev_progress is None or ( - int(prev_progress) != int(value) - and ( - time.time() - self._last_progress_update_time - >= self._progress_update_interval - ) - ): - value = round(value, 2) - self._last_progress_update_time = time.time() - self.post_event(MediaDownloadProgressEvent, progress=value) - - super().__setattr__(name, value) - - -class FileDownloadThread(DownloadThread): - """ - Thread that downloads a generic URL to a file. - """ - - def _run(self): - interrupted = False - - with requests.get(self.url, timeout=self.timeout, stream=True) as response: - response.raise_for_status() - self.size = int(response.headers.get('Content-Length', 0)) or None - - with open(self.path, 'wb') as f: - self.on_start() - - for chunk in response.iter_content(chunk_size=8192): - if not chunk or self.should_stop(): - interrupted = self.should_stop() - if interrupted: - self.stop() - - break - - self.state = DownloadState.DOWNLOADING - f.write(chunk) - percent = f.tell() / self.size * 100 if self.size else 0 - self.progress = percent - - if self._paused.is_set(): - wait_for_either(self._downloading, self._stop_event) - - return not interrupted - - -class YouTubeDownloadThread(DownloadThread): - """ - Thread that downloads a YouTube URL to a file. - """ - - def __init__( - self, - *args, - ytdl: str, - youtube_format: Optional[str] = None, - merge_output_format: Optional[str] = None, - only_audio: bool = False, - **kwargs, - ): - super().__init__(*args, **kwargs) - self._ytdl = ytdl - self._youtube_format = youtube_format - self._merge_output_format = merge_output_format - self._only_audio = only_audio - self._proc = None - self._proc_lock = threading.Lock() - - def _parse_progress(self, line: str): - try: - progress = json.loads(line) - except json.JSONDecodeError: - return - - status = progress.get('status') - if not status: - return - - if status == 'finished': - self.progress = 100 - return - - if status == 'paused': - self.state = DownloadState.PAUSED - elif status == 'downloading': - self.state = DownloadState.DOWNLOADING - - self.size = int(progress.get('total_bytes_estimate', 0)) or self.size - if self.size: - downloaded = int(progress.get('downloaded_bytes', 0)) - self.progress = (downloaded / self.size) * 100 - - def _run(self): - ytdl_cmd = [ - self._ytdl, - '--newline', - '--progress', - '--progress-delta', - str(self._progress_update_interval), - '--progress-template', - '%(progress)j', - *(['-x'] if self._only_audio else []), - *(['-f', self._youtube_format] if self._youtube_format else []), - *( - ['--stream-output-format', self._merge_output_format] - if self._merge_output_format else [] - ), - self.url, - '-o', - self.path, - ] - - self.logger.info('Executing command %r', ytdl_cmd) - err = None - - with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as self._proc: - if self._proc.stdout: - for line in self._proc.stdout: - self.logger.debug( - '%s output: %s', self._ytdl, line.decode().strip() - ) - - self._parse_progress(line.decode()) - - if self.should_stop(): - self.stop() - return self._proc.returncode == 0 - - if self._paused.is_set(): - wait_for_either(self._downloading, self._stop_event) - - if self._proc.returncode != 0: - err = self._proc.stderr.read().decode() if self._proc.stderr else None - raise RuntimeError( - f'{self._ytdl} failed with return code {self._proc.returncode}: {err}' - ) - - return True - - def pause(self): - with self._proc_lock: - if self._proc: - self._proc.send_signal(signal.SIGSTOP) - - super().pause() - - def resume(self): - with self._proc_lock: - if self._proc: - self._proc.send_signal(signal.SIGCONT) - - super().resume() - - def stop(self): - state = None - - with suppress(IOError, OSError), self._proc_lock: - if self._proc: - if self._proc.poll() is None: - self._proc.terminate() - self._proc.wait(timeout=3) - if self._proc.returncode is None: - self._proc.kill() - - state = DownloadState.CANCELLED - elif self._proc.returncode != 0: - state = DownloadState.ERROR - else: - state = DownloadState.COMPLETED - - self._proc = None - - super().stop() - - if state: - self.state = state - - def on_close(self): - self.stop() - super().on_close() diff --git a/platypush/plugins/media/_model.py b/platypush/plugins/media/_model.py new file mode 100644 index 000000000..04b3e14fe --- /dev/null +++ b/platypush/plugins/media/_model.py @@ -0,0 +1,29 @@ +import enum + + +class PlayerState(enum.Enum): + """ + Models the possible states of a media player + """ + + STOP = 'stop' + PLAY = 'play' + PAUSE = 'pause' + IDLE = 'idle' + + +class DownloadState(enum.Enum): + """ + Enum that represents the status of a download. + """ + + IDLE = 'idle' + STARTED = 'started' + DOWNLOADING = 'downloading' + PAUSED = 'paused' + COMPLETED = 'completed' + CANCELLED = 'cancelled' + ERROR = 'error' + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource.py b/platypush/plugins/media/_resource.py deleted file mode 100644 index a54e54d2c..000000000 --- a/platypush/plugins/media/_resource.py +++ /dev/null @@ -1,21 +0,0 @@ -from dataclasses import dataclass -from typing import Optional - - -@dataclass -class MediaResource: - """ - Models a media resource - """ - - resource: str - url: str - title: Optional[str] = None - description: Optional[str] = None - filename: Optional[str] = None - image: Optional[str] = None - duration: Optional[float] = None - channel: Optional[str] = None - channel_url: Optional[str] = None - type: Optional[str] = None - resolution: Optional[str] = None diff --git a/platypush/plugins/media/_resource/__init__.py b/platypush/plugins/media/_resource/__init__.py new file mode 100644 index 000000000..fcc64c6f8 --- /dev/null +++ b/platypush/plugins/media/_resource/__init__.py @@ -0,0 +1,12 @@ +from ._base import MediaResource +from .file import FileMediaResource +from .http import HttpMediaResource +from .youtube import YoutubeMediaResource + + +__all__ = [ + 'FileMediaResource', + 'HttpMediaResource', + 'MediaResource', + 'YoutubeMediaResource', +] diff --git a/platypush/plugins/media/_resource/_base.py b/platypush/plugins/media/_resource/_base.py new file mode 100644 index 000000000..c5317512f --- /dev/null +++ b/platypush/plugins/media/_resource/_base.py @@ -0,0 +1,134 @@ +import io +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass, field, fields +from subprocess import Popen +from typing import IO, Iterable, Optional + +from platypush.plugins import Plugin + + +@dataclass +class MediaResource(ABC): + """ + Models a generic media resource. + + In this case resource/URL can be passed directly to the player. + """ + + resource: str + url: str + media_plugin: Plugin + fd: Optional[IO] = None + title: Optional[str] = None + ext: Optional[str] = None + description: Optional[str] = None + filename: Optional[str] = None + size: Optional[int] = None + image: Optional[str] = None + duration: Optional[float] = None + channel: Optional[str] = None + channel_id: Optional[str] = None + channel_url: Optional[str] = None + type: Optional[str] = None + width: Optional[int] = None + height: Optional[int] = None + resolution: Optional[str] = None + timestamp: Optional[float] = None + fps: Optional[float] = None + audio_channels: Optional[int] = None + view_count: Optional[int] = None + categories: Optional[Iterable[str]] = field(default_factory=list) + tags: Optional[Iterable[str]] = field(default_factory=list) + + @property + def _logger(self): + return logging.getLogger(self.__class__.__name__) + + @property + def _media(self): + """ + This workaround is required to avoid circular imports. + """ + from platypush.plugins.media import MediaPlugin + + assert isinstance(self.media_plugin, MediaPlugin) + return self.media_plugin + + @abstractmethod + def open(self, *_, **__) -> IO: + """ + Opens the media resource. + """ + if self.fd is not None: + try: + self.fd.seek(0) + except io.UnsupportedOperation: + pass + + return self.fd + + def close(self) -> None: + """ + Closes the media resource. + """ + if self.fd is not None: + self.fd.close() + self.fd = None + + def __enter__(self) -> IO: + """ + Opens the media resource using a context manager. + """ + return self.open() + + def __exit__(self, *_, **__) -> None: + """ + Closes the media resource using a context manager. + """ + self.close() + + def to_dict(self) -> dict: + """ + Converts the media resource to a dictionary ready to be serialized. + """ + return { + f.name: getattr(self, f.name) + for f in fields(self) + if f.name not in ['media_plugin', 'fd'] + } + + +@dataclass +class PopenMediaResource(MediaResource, ABC): + """ + Models a media resource that is read from a Popen object. + """ + + proc: Optional[Popen] = None + + def close(self) -> None: + """ + Closes the media resource. + """ + if self.proc is not None: + self.proc.terminate() + self.proc.wait(1) + if self.proc and self.proc.poll() is None: + self.proc.kill() + self.proc.wait(1) + + self.proc = None + + super().close() + + def to_dict(self) -> dict: + """ + Converts the media resource to a dictionary ready to be serialized. + """ + ret = super().to_dict() + ret.pop('proc', None) + return ret + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/downloaders/__init__.py b/platypush/plugins/media/_resource/downloaders/__init__.py new file mode 100644 index 000000000..0a5bcfb9b --- /dev/null +++ b/platypush/plugins/media/_resource/downloaders/__init__.py @@ -0,0 +1,16 @@ +from ._base import DownloadThread, MediaResourceDownloader +from .http import HttpResourceDownloader +from .youtube import YoutubeResourceDownloader + +downloaders = [ + YoutubeResourceDownloader, + HttpResourceDownloader, +] + +__all__ = [ + 'DownloadThread', + 'HttpResourceDownloader', + 'MediaResourceDownloader', + 'YoutubeResourceDownloader', + 'downloaders', +] diff --git a/platypush/plugins/media/_resource/downloaders/_base.py b/platypush/plugins/media/_resource/downloaders/_base.py new file mode 100644 index 000000000..1ae036e67 --- /dev/null +++ b/platypush/plugins/media/_resource/downloaders/_base.py @@ -0,0 +1,208 @@ +import logging +import os +import threading +import time +from abc import ABC, abstractmethod +from typing import Any, Callable, Optional, Type + +from platypush.message.event.media import ( + MediaDownloadCancelledEvent, + MediaDownloadClearEvent, + MediaDownloadCompletedEvent, + MediaDownloadErrorEvent, + MediaDownloadEvent, + MediaDownloadPausedEvent, + MediaDownloadProgressEvent, + MediaDownloadResumedEvent, + MediaDownloadStartedEvent, +) +from platypush.plugins.media._model import DownloadState +from platypush.plugins.media._resource import MediaResource + + +class MediaResourceDownloader(ABC): + """ + Base media resource downloader class. + """ + + def __init__(self, media_plugin, *_, **__): + from platypush.plugins.media import MediaPlugin + + self._media: MediaPlugin = media_plugin + + @abstractmethod + def download( + self, resource: MediaResource, path: Optional[str] = None, **_ + ) -> 'DownloadThread': + pass + + def get_download_path( + self, + resource: MediaResource, + *_, + directory: Optional[str] = None, + filename: Optional[str] = None, + **__, + ) -> str: + directory = ( + os.path.expanduser(directory) if directory else self._media.download_dir + ) + + if not filename: + filename = resource.filename or resource.url.split('/')[-1] + + return os.path.join(directory, filename) + + @abstractmethod + def supports(self, resource_type: MediaResource) -> bool: + return False + + def supports_only_audio(self) -> bool: + return False + + +class DownloadThread(threading.Thread, ABC): + """ + Thread that downloads a URL to a file. + """ + + _progress_update_interval = 1 + """ Throttle the progress updates to this interval, in seconds. """ + + def __init__( + self, + path: str, + url: str, + post_event: Callable, + size: Optional[int] = None, + timeout: Optional[int] = 10, + on_start: Callable[['DownloadThread'], None] = lambda _: None, + on_close: Callable[['DownloadThread'], None] = lambda _: None, + stop_event: Optional[threading.Event] = None, + ): + super().__init__(name=f'DownloadThread-{path}') + self.path = path + self.url = url + self.size = size + self.timeout = timeout + self.state = DownloadState.IDLE + self.progress = None + self.started_at = None + self.ended_at = None + self._upstream_stop_event = stop_event or threading.Event() + self._stop_event = threading.Event() + self._post_event = post_event + self._on_start = on_start + self._on_close = on_close + self._paused = threading.Event() + self._downloading = threading.Event() + self._last_progress_update_time = 0 + self.logger = logging.getLogger(__name__) + + def should_stop(self) -> bool: + return self._stop_event.is_set() or self._upstream_stop_event.is_set() + + @abstractmethod + def _run(self) -> bool: + pass + + def pause(self): + self.state = DownloadState.PAUSED + self._paused.set() + self._downloading.clear() + self.post_event(MediaDownloadPausedEvent) + + def resume(self): + self.state = DownloadState.DOWNLOADING + self._paused.clear() + self._downloading.set() + self.post_event(MediaDownloadResumedEvent) + + def run(self): + super().run() + interrupted = False + + try: + self.on_start() + interrupted = not self._run() + + if interrupted: + self.state = DownloadState.CANCELLED + else: + self.state = DownloadState.COMPLETED + except Exception as e: + self.state = DownloadState.ERROR + self.post_event(MediaDownloadErrorEvent, error=str(e)) + self.logger.warning('Error while downloading URL: %s', e) + finally: + self.on_close() + + def stop(self): + self.state = DownloadState.CANCELLED + self._stop_event.set() + self._downloading.clear() + + def on_start(self): + self.state = DownloadState.STARTED + self.started_at = time.time() + self.post_event(MediaDownloadStartedEvent) + self._on_start(self) + + def on_close(self): + self.ended_at = time.time() + if self.state == DownloadState.CANCELLED: + self.post_event(MediaDownloadCancelledEvent) + elif self.state == DownloadState.COMPLETED: + self.post_event(MediaDownloadCompletedEvent) + + self._on_close(self) + + def clear(self): + if self.state not in (DownloadState.COMPLETED, DownloadState.CANCELLED): + self.logger.info( + 'Download thread for %s is still active, stopping', self.url + ) + + self.stop() + self.join(timeout=10) + + self.post_event(MediaDownloadClearEvent) + + def post_event(self, event_type: Type[MediaDownloadEvent], **kwargs): + kwargs = { + 'resource': self.url, + 'path': self.path, + 'state': self.state.value, + 'size': self.size, + 'timeout': self.timeout, + 'progress': self.progress, + 'started_at': self.started_at, + 'ended_at': self.ended_at, + **kwargs, + } + + self._post_event(event_type, **kwargs) + + def __setattr__(self, name: str, value: Optional[Any], /) -> None: + if name == 'progress' and value is not None: + if value < 0 or value > 100: + self.logger.debug('Invalid progress value:%s', value) + return + + prev_progress = getattr(self, 'progress', None) + + if prev_progress is None or ( + int(prev_progress) != int(value) + and ( + time.time() - self._last_progress_update_time + >= self._progress_update_interval + ) + ): + value = round(value, 2) + self._last_progress_update_time = time.time() + self.post_event(MediaDownloadProgressEvent, progress=value) + + super().__setattr__(name, value) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/downloaders/http.py b/platypush/plugins/media/_resource/downloaders/http.py new file mode 100644 index 000000000..26349a5c7 --- /dev/null +++ b/platypush/plugins/media/_resource/downloaders/http.py @@ -0,0 +1,73 @@ +from typing import Optional +import requests + +from platypush.plugins.media._resource import HttpMediaResource, MediaResource +from platypush.utils import wait_for_either + +from ._base import DownloadState, DownloadThread, MediaResourceDownloader + + +class HttpResourceDownloader(MediaResourceDownloader): + """ + Downloader for generic HTTP URLs. + """ + + def supports(self, resource_type: MediaResource) -> bool: # type: ignore[override] + return isinstance(resource_type, HttpMediaResource) + + def download( # type: ignore[override] + self, + resource: HttpMediaResource, + path: Optional[str] = None, + timeout: Optional[int] = None, + **_ + ) -> 'HttpDownloadThread': + path = path or self.get_download_path(resource=resource) + download_thread = HttpDownloadThread( + url=resource.url, + path=path, + timeout=timeout, + on_start=self._media.on_download_start, + post_event=self._media.post_event, + stop_event=self._media._should_stop, # pylint: disable=protected-access + ) + + self._media.start_download(download_thread) + return download_thread + + +class HttpDownloadThread(DownloadThread): + """ + Thread that downloads a generic URL to a file. + """ + + def _run(self): + interrupted = False + + with requests.get(self.url, timeout=self.timeout, stream=True) as response: + response.raise_for_status() + self.size = int(response.headers.get('Content-Length', 0)) or None + + with open(self.path, 'wb') as f: + self.on_start() + + for chunk in response.iter_content(chunk_size=8192): + if not chunk or self.should_stop(): + interrupted = self.should_stop() + if interrupted: + self.stop() + + break + + self.state = DownloadState.DOWNLOADING + f.write(chunk) + percent = f.tell() / self.size * 100 if self.size else 0 + self.progress = percent + + if self._paused.is_set(): + wait_for_either(self._downloading, self._stop_event) + + return not interrupted + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/downloaders/youtube.py b/platypush/plugins/media/_resource/downloaders/youtube.py new file mode 100644 index 000000000..40d5ef9a5 --- /dev/null +++ b/platypush/plugins/media/_resource/downloaders/youtube.py @@ -0,0 +1,232 @@ +import json +import signal +import subprocess +import threading +from contextlib import suppress +from typing import Optional + +from platypush.plugins.media._model import DownloadState +from platypush.plugins.media._resource import MediaResource, YoutubeMediaResource +from platypush.utils import wait_for_either + +from ._base import DownloadThread, MediaResourceDownloader + + +class YoutubeResourceDownloader(MediaResourceDownloader): + """ + Downloader for YouTube URLs. + """ + + def supports(self, resource_type: MediaResource) -> bool: + return isinstance(resource_type, YoutubeMediaResource) + + def download( # type: ignore[override] + self, + resource: YoutubeMediaResource, + path: str, + youtube_format: Optional[str] = None, + youtube_audio_format: Optional[str] = None, + merge_output_format: Optional[str] = None, + only_audio: bool = False, + **_, + ) -> 'YouTubeDownloadThread': + path = self.get_download_path( + resource=resource, directory=path, youtube_format=youtube_format + ) + + download_thread = YouTubeDownloadThread( + url=resource.url, + path=path, + ytdl=self._media._ytdl, # pylint: disable=protected-access + only_audio=only_audio, + youtube_format=youtube_format or self._media.youtube_format, + youtube_audio_format=youtube_audio_format + or self._media.youtube_audio_format, + merge_output_format=merge_output_format or self._media.merge_output_format, + on_start=self._media.on_download_start, + post_event=self._media.post_event, + stop_event=self._media._should_stop, # pylint: disable=protected-access + ) + + self._media.start_download(download_thread) + return download_thread + + def get_download_path( + self, + resource: YoutubeMediaResource, + *_, + directory: Optional[str] = None, + filename: Optional[str] = None, + youtube_format: Optional[str] = None, + **__, + ) -> str: + youtube_format = youtube_format or self._media.youtube_format + + if not filename: + filename = resource.filename + + if not filename: + with subprocess.Popen( + [ + self._media._ytdl, # pylint: disable=protected-access + *( + [ + '-f', + youtube_format, + ] + if youtube_format + else [] + ), + '-O', + '%(title)s.%(ext)s', + resource.url, + ], + stdout=subprocess.PIPE, + ) as proc: + assert proc.stdout, 'yt-dlp stdout is None' + filename = proc.stdout.read().decode()[:-1] + + return super().get_download_path( + resource, directory=directory, filename=filename + ) + + def supports_only_audio(self) -> bool: + return True + + +class YouTubeDownloadThread(DownloadThread): + """ + Thread that downloads a YouTube URL to a file. + """ + + def __init__( + self, + *args, + ytdl: str, + youtube_format: Optional[str] = None, + youtube_audio_format: Optional[str] = None, + merge_output_format: Optional[str] = None, + only_audio: bool = False, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._ytdl = ytdl + self._youtube_format = youtube_format + self._youtube_audio_format = youtube_audio_format + self._merge_output_format = merge_output_format + self._only_audio = only_audio + self._proc = None + self._proc_lock = threading.Lock() + + def _parse_progress(self, line: str): + try: + progress = json.loads(line) + except json.JSONDecodeError: + return + + status = progress.get('status') + if not status: + return + + if status == 'finished': + self.progress = 100 + return + + if status == 'paused': + self.state = DownloadState.PAUSED + elif status == 'downloading': + self.state = DownloadState.DOWNLOADING + + self.size = int(progress.get('total_bytes_estimate', 0)) or self.size + if self.size: + downloaded = int(progress.get('downloaded_bytes', 0)) + self.progress = (downloaded / self.size) * 100 + + def _run(self): + youtube_format = ( + self._youtube_audio_format if self._only_audio else self._youtube_format + ) + + ytdl_cmd = [ + self._ytdl, + '--newline', + '--progress', + '--progress-delta', + str(self._progress_update_interval), + '--progress-template', + '%(progress)j', + *(['-x'] if self._only_audio else []), + *(['-f', youtube_format] if youtube_format else []), + self.url, + '-o', + self.path, + ] + + self.logger.info('Executing command %r', ytdl_cmd) + err = None + + with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as self._proc: + if self._proc.stdout: + for line in self._proc.stdout: + self.logger.debug( + '%s output: %s', self._ytdl, line.decode().strip() + ) + + self._parse_progress(line.decode()) + + if self.should_stop(): + self.stop() + return self._proc.returncode == 0 + + if self._paused.is_set(): + wait_for_either(self._downloading, self._stop_event) + + if self._proc.returncode != 0: + err = self._proc.stderr.read().decode() if self._proc.stderr else None + raise RuntimeError( + f'{self._ytdl} failed with return code {self._proc.returncode}: {err}' + ) + + return True + + def pause(self): + with self._proc_lock: + if self._proc: + self._proc.send_signal(signal.SIGSTOP) + + super().pause() + + def resume(self): + with self._proc_lock: + if self._proc: + self._proc.send_signal(signal.SIGCONT) + + super().resume() + + def stop(self): + state = None + + with suppress(IOError, OSError), self._proc_lock: + if self._proc: + if self._proc.poll() is None: + self._proc.terminate() + self._proc.wait(timeout=3) + if self._proc.returncode is None: + self._proc.kill() + + state = DownloadState.CANCELLED + elif self._proc.returncode != 0: + state = DownloadState.ERROR + else: + state = DownloadState.COMPLETED + + self._proc = None + + super().stop() + + if state: + self.state = state + + def on_close(self): + self.stop() + super().on_close() diff --git a/platypush/plugins/media/_resource/file.py b/platypush/plugins/media/_resource/file.py new file mode 100644 index 000000000..868fc26b7 --- /dev/null +++ b/platypush/plugins/media/_resource/file.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import IO + +from ._base import MediaResource + + +@dataclass +class FileMediaResource(MediaResource): + """ + Models a media resource that is read from a file. + """ + + def open(self, *args, **kwargs) -> IO: + """ + Opens the media resource. + """ + if self.fd is None: + self.fd = open(self.resource, 'rb') # pylint: disable=consider-using-with + + return super().open(*args, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/http.py b/platypush/plugins/media/_resource/http.py new file mode 100644 index 000000000..1aa42715d --- /dev/null +++ b/platypush/plugins/media/_resource/http.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass + +from ._base import MediaResource + + +@dataclass +class HttpMediaResource(MediaResource): + """ + Models a media resource that is read from an HTTP response. + """ + + def open(self, *args, **kwargs): + return super().open(*args, **kwargs) + + def close(self) -> None: + super().close() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/parsers/__init__.py b/platypush/plugins/media/_resource/parsers/__init__.py new file mode 100644 index 000000000..63caba33a --- /dev/null +++ b/platypush/plugins/media/_resource/parsers/__init__.py @@ -0,0 +1,26 @@ +from ._base import MediaResourceParser +from .file import FileResourceParser +from .http import HttpResourceParser +from .torrent import TorrentResourceParser +from .youtube import YoutubeResourceParser + + +parsers = [ + FileResourceParser, + YoutubeResourceParser, + TorrentResourceParser, + HttpResourceParser, +] + + +__all__ = [ + 'MediaResourceParser', + 'FileResourceParser', + 'HttpResourceParser', + 'TorrentResourceParser', + 'YoutubeResourceParser', + 'parsers', +] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/parsers/_base.py b/platypush/plugins/media/_resource/parsers/_base.py new file mode 100644 index 000000000..8162fd525 --- /dev/null +++ b/platypush/plugins/media/_resource/parsers/_base.py @@ -0,0 +1,28 @@ +from abc import ABC, abstractmethod +from logging import getLogger +from typing import Optional + +from .. import MediaResource + + +# pylint: disable=too-few-public-methods +class MediaResourceParser(ABC): + """ + Base class for media resource parsers. + """ + + def __init__(self, media_plugin, *_, **__): + from platypush.plugins.media import MediaPlugin + + self._media: MediaPlugin = media_plugin + self.logger = getLogger(self.__class__.__name__) + + @abstractmethod + def parse(self, resource: str, *_, **__) -> Optional[MediaResource]: + """ + Parses a media resource and returns a MediaResource object. + """ + raise NotImplementedError + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/parsers/file.py b/platypush/plugins/media/_resource/parsers/file.py new file mode 100644 index 000000000..a26cb62c4 --- /dev/null +++ b/platypush/plugins/media/_resource/parsers/file.py @@ -0,0 +1,37 @@ +import os +from typing import Optional + +from ..._search.local.metadata import get_file_metadata +from .. import FileMediaResource +from ._base import MediaResourceParser + + +# pylint: disable=too-few-public-methods +class FileResourceParser(MediaResourceParser): + """ + Parser for local file resources. + """ + + def parse(self, resource: str, *_, **__) -> Optional[FileMediaResource]: + if resource.startswith('file://') or os.path.isfile(resource): + path = resource + if resource.startswith('file://'): + path = resource[len('file://') :] + + assert os.path.isfile(path), f'File {path} not found' + metadata = get_file_metadata(path) + metadata['timestamp'] = metadata.pop('created_at', None) + + return FileMediaResource( + resource=path, + url=f'file://{path}', + media_plugin=self._media, + title=os.path.basename(resource), + filename=os.path.basename(resource), + **metadata, + ) + + return None + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/parsers/http.py b/platypush/plugins/media/_resource/parsers/http.py new file mode 100644 index 000000000..2c4f9f89a --- /dev/null +++ b/platypush/plugins/media/_resource/parsers/http.py @@ -0,0 +1,31 @@ +import os +from typing import Optional + +from .. import HttpMediaResource +from ._base import MediaResourceParser + + +# pylint: disable=too-few-public-methods +class HttpResourceParser(MediaResourceParser): + """ + Parser for HTTP resources. + """ + + def parse(self, resource: str, *_, **__) -> Optional[HttpMediaResource]: + if resource.startswith('http://') or resource.startswith('https://'): + assert self._media.is_media_file( + resource + ), f'Invalid media resource: {resource}' + + return HttpMediaResource( + resource=resource, + url=resource, + media_plugin=self._media, + title=os.path.basename(resource), + filename=os.path.basename(resource), + ) + + return None + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/parsers/torrent.py b/platypush/plugins/media/_resource/parsers/torrent.py new file mode 100644 index 000000000..5721c31a1 --- /dev/null +++ b/platypush/plugins/media/_resource/parsers/torrent.py @@ -0,0 +1,62 @@ +import os +import queue +from typing import Optional + +from platypush.context import get_plugin + +from .. import FileMediaResource +from ._base import MediaResourceParser + + +# pylint: disable=too-few-public-methods +class TorrentResourceParser(MediaResourceParser): + """ + Parser for magnet links. + """ + + @staticmethod + def _torrent_event_handler(evt_queue): + def handler(event): + # More than 5% of the torrent has been downloaded + if event.args.get('progress', 0) > 5 and event.args.get('files'): + evt_queue.put( + [f for f in event.args['files'] if f.is_media_file(f.filename)] + ) + + return handler + + def parse(self, resource: str, *_, **__) -> Optional[FileMediaResource]: + if not resource.startswith('magnet:?'): + return None + + torrents = get_plugin(self._media.torrent_plugin) + assert torrents, f'{self._media.torrent_plugin} plugin not configured' + + evt_queue = queue.Queue() + torrents.download( + resource, + download_dir=self._media.download_dir, + _async=True, + is_media=True, + event_hndl=self._torrent_event_handler(evt_queue), + ) + + resources = [f for f in evt_queue.get()] # noqa: C416,R1721 + + if resources: + self._media._videos_queue = videos_queue = sorted(resources) + resource = videos_queue.pop(0) + else: + raise RuntimeError(f'No media file found in torrent {resource}') + + assert resource, 'Unable to find any compatible media resource' + return FileMediaResource( + resource=resource, + url=f'file://{resource}', + media_plugin=self._media, + title=os.path.basename(resource), + filename=os.path.basename(resource), + ) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/parsers/youtube.py b/platypush/plugins/media/_resource/parsers/youtube.py new file mode 100644 index 000000000..c9987f7bc --- /dev/null +++ b/platypush/plugins/media/_resource/parsers/youtube.py @@ -0,0 +1,119 @@ +import inspect +import json +import re +import subprocess +from dataclasses import fields +from typing import Dict, Optional + +from .. import YoutubeMediaResource +from ._base import MediaResourceParser + + +class YoutubeResourceParser(MediaResourceParser): + """ + Parser for yt-dlp-compatible resources. + """ + + @staticmethod + def _get_extractors(): + try: + from yt_dlp.extractor import _extractors # type: ignore + except ImportError: + # yt-dlp not installed + return + + for _, obj_type in inspect.getmembers(_extractors): + if ( + inspect.isclass(obj_type) + and isinstance(getattr(obj_type, "_VALID_URL", None), str) + and obj_type.__name__ != "GenericIE" + ): + yield obj_type + + @classmethod + def is_youtube_resource(cls, resource: str): + return any( + re.search(getattr(extractor, '_VALID_URL', '^$'), resource) + for extractor in cls._get_extractors() + ) + + @staticmethod + def _get_youtube_best_thumbnail(info: Dict[str, dict]): + thumbnails = info.get('thumbnails', {}) + if not thumbnails: + return None + + # Preferred resolution + for res in ((640, 480), (480, 360), (320, 240)): + thumb = next( + ( + thumb + for thumb in thumbnails + if thumb.get('width') == res[0] and thumb.get('height') == res[1] + ), + None, + ) + + if thumb: + return thumb.get('url') + + # Default fallback (best quality) + return info.get('thumbnail') + + def parse( + self, + resource: str, + *_, + youtube_format: Optional[str] = None, + youtube_audio_format: Optional[str] = None, + only_audio: bool = False, + **__ + ) -> Optional[YoutubeMediaResource]: + if not self.is_youtube_resource(resource): + return None + + youtube_format = youtube_format or self._media.youtube_format + if only_audio: + youtube_format = ( + youtube_audio_format + or self._media.youtube_audio_format + or youtube_format + ) + + ytdl_cmd = [ + self._media._ytdl, # pylint: disable=protected-access + *(['-f', youtube_format] if youtube_format else []), + *(['-x'] if only_audio else []), + '-j', + '-g', + resource, + ] + + self.logger.info('Executing command %s', ' '.join(ytdl_cmd)) + with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as ytdl: + output = ytdl.communicate()[0].decode().strip() + ytdl.wait() + + self.logger.debug('yt-dlp output: %s', output) + lines = output.split('\n') + + if not lines: + self.logger.warning('No output from yt-dlp') + return None + + info = json.loads(lines[-1]) + args = { + **{ + field.name: info.get(field.name) + for field in fields(YoutubeMediaResource) + }, + 'url': info.get('webpage_url'), + 'image': self._get_youtube_best_thumbnail(info), + 'type': info.get('extractor'), + 'media_plugin': self._media, + } + + return YoutubeMediaResource(**args) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_resource/youtube.py b/platypush/plugins/media/_resource/youtube.py new file mode 100644 index 000000000..a9347c4b9 --- /dev/null +++ b/platypush/plugins/media/_resource/youtube.py @@ -0,0 +1,145 @@ +import os +import subprocess +from dataclasses import dataclass +from typing import IO, Optional, Sequence + +from ._base import PopenMediaResource + + +@dataclass +class YoutubeMediaResource(PopenMediaResource): + """ + Models a YouTube media resource. + """ + + id: Optional[str] = None + is_temporary: bool = False + + def _generate_file(self, merge_format: str): + self.resource = os.path.join( + self._media.cache_dir, f'platypush-yt-dlp-{self.id}.{merge_format}' + ) + self.is_temporary = True + + def _prepare_file(self, merge_output_format: str): + if not self.resource: + self._generate_file(merge_output_format) + + filename = ( + self.resource[len('file://') :] + if self.resource.startswith('file://') + else self.resource + ) + + # Remove the file if it already exists and it's empty, to avoid YTDL + # errors + if ( + os.path.exists(os.path.abspath(filename)) + and os.path.getsize(os.path.abspath(filename)) == 0 + ): + self._logger.debug('Removing empty file: %s', filename) + os.unlink(os.path.abspath(filename)) + + def open( + self, + *args, + youtube_format: Optional[str] = None, + merge_output_format: Optional[str] = None, + cache_streams: bool = False, + ytdl_args: Optional[Sequence[str]] = None, + **kwargs, + ) -> IO: + if self.proc is None: + merge_output_format = merge_output_format or self._media.merge_output_format + use_file = ( + not self._media.supports_local_pipe or cache_streams or self.resource + ) + + if use_file: + self._prepare_file(merge_output_format=merge_output_format) + + output = ['-o', self.resource] if use_file else ['-o', '-'] + youtube_format = youtube_format or self._media.youtube_format + ytdl_args = ytdl_args or self._media.ytdl_args + cmd = [ + self._media._ytdl, # pylint: disable=protected-access + '--no-part', + *( + [ + '-f', + youtube_format, + ] + if youtube_format + else [] + ), + *( + ['--merge-output-format', merge_output_format] + if merge_output_format + else [] + ), + *output, + *ytdl_args, + self.url, + ] + + proc_args = {} + if not use_file: + proc_args['stdout'] = subprocess.PIPE + + self._logger.debug('Running command: %s', ' '.join(cmd)) + self._logger.debug('Media resource: %s', self.to_dict()) + self.proc = subprocess.Popen( # pylint: disable=consider-using-with + cmd, **proc_args + ) + + if use_file: + self._wait_for_download_start() + self.fd = open( # pylint: disable=consider-using-with + self.resource, 'rb' + ) + elif self.proc.stdout: + self.fd = self.proc.stdout + + return super().open(*args, **kwargs) + + def _wait_for_download_start(self) -> None: + self._logger.info('Waiting for download to start on file: %s', self.resource) + + while True: + file = self.resource + if not file: + self._logger.info('No file found to wait for download') + break + + if not self.proc: + self._logger.info('No download process found to wait') + break + + if self.proc.poll() is not None: + self._logger.info( + 'Download process exited with status %d', self.proc.returncode + ) + break + + # The file must exist and be at least 1MB in size + if os.path.exists(file) and os.path.getsize(file) > 1024 * 1024: + self._logger.info('Download started, process PID: %s', self.proc.pid) + break + + try: + self.proc.wait(1) + except subprocess.TimeoutExpired: + pass + + def close(self) -> None: + super().close() + + if self.is_temporary and self.resource and os.path.exists(self.resource): + try: + self._logger.debug('Removing temporary file: %s', self.resource) + os.unlink(self.resource) + except FileNotFoundError: + pass + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_search/__init__.py b/platypush/plugins/media/_search/__init__.py new file mode 100644 index 000000000..d507292b3 --- /dev/null +++ b/platypush/plugins/media/_search/__init__.py @@ -0,0 +1,59 @@ +import logging +from abc import ABC, abstractmethod +from typing import Optional + +from platypush.plugins import Plugin + + +class MediaSearcher(ABC): + """ + Base class for media searchers + """ + + def __init__(self, *_, media_plugin: Optional[Plugin] = None, **__): + from .. import MediaPlugin + + self.logger = logging.getLogger(self.__class__.__name__) + assert isinstance( + media_plugin, MediaPlugin + ), f'Invalid media plugin: {media_plugin}' + self.media_plugin: Optional[MediaPlugin] = media_plugin + + @abstractmethod + def search(self, query, *args, **kwargs): + raise NotImplementedError( + 'The search method should be implemented by a derived class' + ) + + @abstractmethod + def supports(self, type: str) -> bool: + raise NotImplementedError( + 'The type method should be implemented by a derived class' + ) + + +from .local import LocalMediaSearcher # noqa: E402 +from .youtube import YoutubeMediaSearcher # noqa: E402 +from .torrent import TorrentMediaSearcher # noqa: E402 +from .plex import PlexMediaSearcher # noqa: E402 +from .jellyfin import JellyfinMediaSearcher # noqa: E402 + +searchers = [ + LocalMediaSearcher, + YoutubeMediaSearcher, + TorrentMediaSearcher, + PlexMediaSearcher, + JellyfinMediaSearcher, +] + +__all__ = [ + 'JellyfinMediaSearcher', + 'LocalMediaSearcher', + 'MediaSearcher', + 'PlexMediaSearcher', + 'TorrentMediaSearcher', + 'YoutubeMediaSearcher', +] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/jellyfin.py b/platypush/plugins/media/_search/jellyfin.py similarity index 56% rename from platypush/plugins/media/search/jellyfin.py rename to platypush/plugins/media/_search/jellyfin.py index 3a9d8a4e5..c11f95c00 100644 --- a/platypush/plugins/media/search/jellyfin.py +++ b/platypush/plugins/media/_search/jellyfin.py @@ -1,9 +1,16 @@ from platypush.context import get_plugin -from platypush.plugins.media.search import MediaSearcher +from platypush.plugins.media._search import MediaSearcher class JellyfinMediaSearcher(MediaSearcher): - def search(self, query, **_): + """ + Jellyfin media searcher. + """ + + def supports(self, type: str) -> bool: + return type == 'jellyfin' + + def search(self, query, *_, **__): """ Performs a search on a Jellyfin server using the configured :class:`platypush.plugins.media.jellyfin.MediaJellyfinPlugin` @@ -18,9 +25,11 @@ def search(self, query, **_): if not media: return [] - self.logger.info('Searching Jellyfin for "{}"'.format(query)) + self.logger.info('Searching Jellyfin for "%s"', query) results = media.search(query=query).output - self.logger.info('{} Jellyfin results found for the search query "{}"'.format(len(results), query)) + self.logger.info( + '%d Jellyfin results found for the search query "%s"', len(results), query + ) return results diff --git a/platypush/plugins/media/search/local/__init__.py b/platypush/plugins/media/_search/local/__init__.py similarity index 98% rename from platypush/plugins/media/search/local/__init__.py rename to platypush/plugins/media/_search/local/__init__.py index 628fc7e09..e1d482da0 100644 --- a/platypush/plugins/media/search/local/__init__.py +++ b/platypush/plugins/media/_search/local/__init__.py @@ -8,8 +8,7 @@ from sqlalchemy.sql.expression import func from platypush.config import Config -from platypush.plugins.media import MediaPlugin -from platypush.plugins.media.search import MediaSearcher +from platypush.plugins.media._search import MediaSearcher from .db import ( Base, @@ -43,6 +42,9 @@ def __init__(self, dirs, *args, **kwargs): self.db_file = os.path.join(db_dir, 'media.db') self._db_engine = None + def supports(self, type: str) -> bool: + return type == 'file' + def _get_db_session(self): if not self._db_engine: self._db_engine = create_engine( @@ -124,6 +126,7 @@ def scan(self, media_dir, session=None, dir_record=None): Scans a media directory and stores the search results in the internal SQLite index """ + from platypush.plugins.media import MediaPlugin if not session: session = self._get_db_session() diff --git a/platypush/plugins/media/search/local/db.py b/platypush/plugins/media/_search/local/db.py similarity index 100% rename from platypush/plugins/media/search/local/db.py rename to platypush/plugins/media/_search/local/db.py diff --git a/platypush/plugins/media/search/local/metadata.py b/platypush/plugins/media/_search/local/metadata.py similarity index 95% rename from platypush/plugins/media/search/local/metadata.py rename to platypush/plugins/media/_search/local/metadata.py index 574590d7b..da27b4122 100644 --- a/platypush/plugins/media/search/local/metadata.py +++ b/platypush/plugins/media/_search/local/metadata.py @@ -3,11 +3,11 @@ import logging import multiprocessing import os +import shutil import subprocess from concurrent.futures import ProcessPoolExecutor from dateutil.parser import isoparse -import shutil logger = logging.getLogger(__name__) @@ -58,6 +58,8 @@ def get_file_metadata(path: str): 'duration': ret.get('format', {}).get('duration'), 'width': video_stream.get('width'), 'height': video_stream.get('height'), + 'resolution': f"{video_stream.get('width')}x{video_stream.get('height')}", + 'size': os.path.getsize(path), 'created_at': creation_time, } diff --git a/platypush/plugins/media/_search/plex.py b/platypush/plugins/media/_search/plex.py new file mode 100644 index 000000000..49d68830b --- /dev/null +++ b/platypush/plugins/media/_search/plex.py @@ -0,0 +1,91 @@ +from platypush.context import get_plugin +from platypush.plugins.media._search import MediaSearcher + + +# pylint: disable=too-few-public-methods +class PlexMediaSearcher(MediaSearcher): + """ + Plex media searcher. + """ + + def supports(self, type: str) -> bool: + return type == 'plex' + + def search(self, query: str, *_, **__): + """ + Performs a Plex search using the configured :class:`platypush.plugins.media.plex.MediaPlexPlugin` instance if + it is available. + """ + + try: + plex = get_plugin('media.plex') + except RuntimeError: + return [] + + assert plex, 'No Plex plugin configured' + self.logger.info('Searching Plex for "%s"', query) + results = [] + + for result in plex.search(title=query).output: + results.extend(self._flatten_result(result)) + + self.logger.info( + '%d Plex results found for the search query "%s"', len(results), query + ) + return results + + @staticmethod + def _flatten_result(result): + def parse_part(media, part, episode=None, sub_media=None): + if 'episodes' in media: + del media['episodes'] + + return { + **{k: v for k, v in result.items() if k not in ['media', 'type']}, + 'media_type': result.get('type'), + 'type': 'plex', + **{k: v for k, v in media.items() if k not in ['parts']}, + **part, + 'title': ( + result.get('title', '') + + ( + ' [' + (episode or {}).get('season_episode', '') + ']' + if (episode or {}).get('season_episode') + else '' + ) + + ( + ' ' + (sub_media or {}).get('title', '') + if (sub_media or {}).get('title') + else '' + ), + ), + 'summary': ( + (episode or {}).get('summary') + if (episode or {}).get('summary') + else media.get('summary') + ), + } + + results = [] + + for media in result.get('media', []): + if 'episodes' in media: + for episode in media['episodes']: + for sub_media in episode.get('media', []): + for part in sub_media.get('parts', []): + results.append( + parse_part( + media=media, + episode=episode, + sub_media=sub_media, + part=part, + ) + ) + else: + for part in media.get('parts', []): + results.append(parse_part(media=media, part=part)) + + return results + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/torrent.py b/platypush/plugins/media/_search/torrent.py similarity index 56% rename from platypush/plugins/media/search/torrent.py rename to platypush/plugins/media/_search/torrent.py index f83f750e8..f10658d40 100644 --- a/platypush/plugins/media/search/torrent.py +++ b/platypush/plugins/media/_search/torrent.py @@ -1,14 +1,22 @@ from platypush.context import get_plugin -from platypush.plugins.media.search import MediaSearcher +from platypush.plugins.media._search import MediaSearcher +# pylint: disable=too-few-public-methods class TorrentMediaSearcher(MediaSearcher): - def search(self, query, **kwargs): - self.logger.info('Searching torrents for "{}"'.format(query)) + """ + Media searcher for torrents. + + It needs at least one torrent plugin to be configured. + """ + + def search(self, query: str, *_, **__): + self.logger.info('Searching torrents for "%s"', query) torrents = get_plugin( self.media_plugin.torrent_plugin if self.media_plugin else 'torrent' ) + if not torrents: raise RuntimeError('Torrent plugin not available/configured') @@ -20,5 +28,8 @@ def search(self, query, **kwargs): if torrent.get('is_media') ] + def supports(self, type: str) -> bool: + return type == 'torrent' + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/youtube.py b/platypush/plugins/media/_search/youtube.py similarity index 79% rename from platypush/plugins/media/search/youtube.py rename to platypush/plugins/media/_search/youtube.py index 091563233..8ca53daff 100644 --- a/platypush/plugins/media/search/youtube.py +++ b/platypush/plugins/media/_search/youtube.py @@ -1,5 +1,5 @@ from platypush.context import get_plugin -from platypush.plugins.media.search import MediaSearcher +from platypush.plugins.media._search import MediaSearcher # pylint: disable=too-few-public-methods @@ -18,5 +18,8 @@ def search(self, query: str, *_, **__): assert yt, 'YouTube plugin not available/configured' return yt.search(query=query).output + def supports(self, type: str) -> bool: + return type == 'youtube' + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/_state.py b/platypush/plugins/media/_state.py deleted file mode 100644 index e41310ec4..000000000 --- a/platypush/plugins/media/_state.py +++ /dev/null @@ -1,12 +0,0 @@ -import enum - - -class PlayerState(enum.Enum): - """ - Models the possible states of a media player - """ - - STOP = 'stop' - PLAY = 'play' - PAUSE = 'pause' - IDLE = 'idle' diff --git a/platypush/plugins/media/chromecast/__init__.py b/platypush/plugins/media/chromecast/__init__.py index 8c9727076..b1a27542b 100644 --- a/platypush/plugins/media/chromecast/__init__.py +++ b/platypush/plugins/media/chromecast/__init__.py @@ -1,4 +1,5 @@ -from typing import Optional +from typing import Dict, Optional, Sequence +from uuid import UUID from pychromecast import ( CastBrowser, @@ -10,9 +11,14 @@ from platypush.backend.http.app.utils import get_remote_base_url from platypush.plugins import RunnablePlugin, action -from platypush.plugins.media import MediaPlugin +from platypush.plugins.media import MediaPlugin, MediaResource +from platypush.plugins.media._resource.youtube import YoutubeMediaResource from platypush.utils import get_mime_type -from platypush.message.event.media import MediaPlayRequestEvent +from platypush.message.event.media import ( + MediaEvent, + MediaPlayRequestEvent, + MediaStopEvent, +) from ._listener import MediaListener from ._subtitles import SubtitlesAsyncHandler @@ -29,22 +35,49 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): STREAM_TYPE_LIVE = "LIVE" def __init__( - self, chromecast: Optional[str] = None, poll_interval: float = 30, **kwargs + self, + chromecast: Optional[str] = None, + poll_interval: float = 30, + youtube_format: Optional[str] = 'bv[width<=?1080][ext=mp4]+ba[ext=m4a]/bv+ba', + merge_output_format: str = 'mp4', + # Transcode to H.264/AAC to maximimze compatibility with Chromecast codecs + ytdl_args: Optional[Sequence[str]] = ( + '--use-postprocessor', + 'FFmpegCopyStream', + '--ppa', + 'CopyStream:"-c:v libx264 -preset veryfast -crf 28 +faststart -c:a aac"', + ), + use_ytdl: bool = True, + **kwargs, ): """ :param chromecast: Default Chromecast to cast to if no name is specified. :param poll_interval: How often the plugin should poll for new/removed Chromecast devices (default: 30 seconds). + :param use_ytdl: Use youtube-dl to download the media if we are dealing + with formats compatible with youtube-dl/yt-dlp. Disable this option + if you experience issues with the media playback on the Chromecast, + such as media with no video, no audio or both. This option will + disable muxing+transcoding over the HTTP server and will stream the + URL directly to the Chromecast. """ - super().__init__(poll_interval=poll_interval, **kwargs) + super().__init__( + poll_interval=poll_interval, + youtube_format=youtube_format, + merge_output_format=merge_output_format, + ytdl_args=ytdl_args, + **kwargs, + ) self._is_local = False self.chromecast = chromecast self._chromecasts_by_uuid = {} self._chromecasts_by_name = {} self._media_listeners = {} + self._latest_resources_by_device: Dict[UUID, MediaResource] = {} self._zc = None self._browser = None + self._use_ytdl = use_ytdl @property def zc(self): @@ -90,6 +123,9 @@ def _serialize_device(self, cc: Chromecast) -> dict: else: raise RuntimeError('Invalid Chromecast object') + resource = self._latest_resources_by_device.get(cc.uuid) + resource_dump = resource.to_dict() if resource else {} + return { 'type': cc.cast_type, 'name': cc.name, @@ -111,19 +147,25 @@ def _serialize_device(self, cc: Chromecast) -> dict: 'volume': round(100 * cc.status.volume_level, 2), 'muted': cc.status.volume_muted, **convert_status(cc.media_controller.status), + **resource_dump, } if cc.status else {} ), } - def _event_callback(self, _, cast: Chromecast): + def _event_callback(self, evt: MediaEvent, cast: Chromecast): + if isinstance(evt, MediaStopEvent): + resource = self._latest_resources_by_device.pop(cast.uuid, None) + if resource: + resource.close() + self._chromecasts_by_uuid[cast.uuid] = cast self._chromecasts_by_name[ self._get_device_property(cast, 'friendly_name') ] = cast - def get_chromecast(self, chromecast=None): + def get_chromecast(self, chromecast=None) -> Chromecast: if isinstance(chromecast, Chromecast): return chromecast @@ -151,7 +193,9 @@ def play( subtitles_lang: str = 'en-US', subtitles_mime: str = 'text/vtt', subtitle_id: int = 1, - **__, + youtube_format: Optional[str] = None, + use_ytdl: Optional[bool] = None, + **kwargs, ): """ Cast media to an available Chromecast device. @@ -171,6 +215,8 @@ def play( :param subtitles_lang: Subtitles language (default: en-US) :param subtitles_mime: Subtitles MIME type (default: text/vtt) :param subtitle_id: ID of the subtitles to be loaded (default: 1) + :param youtube_format: Override the default YouTube format. + :param use_ytdl: Override the default use_ytdl setting for this call. """ if not chromecast: @@ -179,10 +225,26 @@ def play( post_event(MediaPlayRequestEvent, resource=resource, device=chromecast) cast = self.get_chromecast(chromecast) mc = cast.media_controller - resource = self._get_resource(resource) + media = self._latest_resource = self._latest_resources_by_device[ + cast.uuid + ] = self._get_resource(resource, **kwargs) + + youtube_format = youtube_format or self.youtube_format + use_ytdl = use_ytdl if use_ytdl is not None else self._use_ytdl + ytdl_args = kwargs.pop('ytdl_args', self.ytdl_args) + + if isinstance(media, YoutubeMediaResource) and not use_ytdl: + # Use the original URL if it's a YouTube video and we're not using youtube-dl + media.resource = media.url + else: + media.open(youtube_format=youtube_format, ytdl_args=ytdl_args, **kwargs) + + assert media.resource, 'No playable resource found' + resource = media.resource + self.logger.debug('Opened media resource: %s', media.to_dict()) if not content_type: - content_type = get_mime_type(resource) + content_type = get_mime_type(media.resource) if not content_type: raise RuntimeError(f'content_type required to process media {resource}') @@ -192,19 +254,13 @@ def play( resource = get_remote_base_url() + resource self.logger.info('HTTP media stream started on %s', resource) - if self._latest_resource: - if not title: - title = self._latest_resource.title - if not image_url: - image_url = self._latest_resource.image - self.logger.info('Playing %s on %s', resource, chromecast) mc.play_media( resource, content_type, - title=self._latest_resource.title if self._latest_resource else title, - thumb=image_url, + title=title or media.title, + thumb=image_url or media.image, current_time=current_time, autoplay=autoplay, stream_type=stream_type, @@ -629,6 +685,15 @@ def _refresh_chromecasts(self): self._chromecasts_by_uuid[cc.uuid] = cc self._chromecasts_by_name[name] = cc + @property + def supports_local_media(self) -> bool: + # Chromecasts can't play local media: they always need an HTTP URL + return False + + @property + def supports_local_pipe(self) -> bool: + return False + def main(self): while not self.should_stop(): try: diff --git a/platypush/plugins/media/chromecast/_listener.py b/platypush/plugins/media/chromecast/_listener.py index fd78efe12..5c0419e80 100644 --- a/platypush/plugins/media/chromecast/_listener.py +++ b/platypush/plugins/media/chromecast/_listener.py @@ -1,8 +1,9 @@ import logging import time -from typing import Optional +from typing import Optional, Type from platypush.message.event.media import ( + MediaEvent, MediaPlayEvent, MediaStopEvent, MediaPauseEvent, @@ -42,7 +43,10 @@ def new_media_status(self, status): self._post_event(MediaPlayEvent) elif state == 'pause': self._post_event(MediaPauseEvent) - elif state in ('stop', 'idle'): + elif state in ('stop', 'idle') and self.status.get('state') in ( + 'play', + 'pause', + ): self._post_event(MediaStopEvent) if status.get('volume') != self.status.get('volume'): @@ -62,7 +66,7 @@ def new_media_status(self, status): def load_media_failed(self, item, error_code): logger.warning('Failed to load media %s: %d', item, error_code) - def _post_event(self, evt_type, **evt): + def _post_event(self, evt_type: Type[MediaEvent], **evt): status = evt.get('status', {}) resource = status.get('url') args = { diff --git a/platypush/plugins/media/gstreamer/__init__.py b/platypush/plugins/media/gstreamer/__init__.py index fbf60e324..7d8af6536 100644 --- a/platypush/plugins/media/gstreamer/__init__.py +++ b/platypush/plugins/media/gstreamer/__init__.py @@ -1,4 +1,3 @@ -from dataclasses import asdict import os from typing import Optional @@ -6,6 +5,7 @@ from platypush.message.event.media import MediaPlayRequestEvent, MediaVolumeChangedEvent from platypush.plugins import action +from platypush.plugins.media import MediaResource from platypush.plugins.media.gstreamer.model import MediaPipeline @@ -21,20 +21,61 @@ def __init__(self, sink: Optional[str] = None, *args, **kwargs): super().__init__(*args, **kwargs) self.sink = sink self._player: Optional[MediaPipeline] = None - self._resource: Optional[str] = None - def _allocate_pipeline(self, resource: str) -> MediaPipeline: + def _allocate_pipeline(self, resource: MediaResource) -> MediaPipeline: pipeline = MediaPipeline(resource) if self.sink: sink = pipeline.add_sink(self.sink, sync=False) pipeline.link(pipeline.get_source(), sink) self._player = pipeline - self._resource = resource + self._latest_resource = resource return pipeline + def _status(self) -> dict: + if not self._player: + return {'state': PlayerState.STOP.value} + + pos = self._player.get_position() + length = self._player.get_duration() + + status = { + 'duration': length, + 'mute': self._player.is_muted(), + 'pause': self._player.is_paused(), + 'percent_pos': ( + pos / length + if pos is not None and length is not None and pos >= 0 and length > 0 + else 0 + ), + 'position': pos, + 'seekable': length is not None and length > 0, + 'state': self._gst_to_player_state(self._player.get_state()).value, + 'volume': self._player.get_volume() * 100, + } + + if self._latest_resource: + status.update( + { + k: v + for k, v in self._latest_resource.to_dict().items() + if v is not None + } + ) + + return status + + def _get_volume(self) -> float: + assert self._player, 'No instance is running' + return self._player.get_volume() * 100.0 + + def _set_position(self, position: float) -> dict: + assert self._player, 'No instance is running' + self._player.seek(position) + return self._status() + @action - def play(self, resource: Optional[str] = None, **_): + def play(self, resource: Optional[str] = None, **kwargs): """ Play a resource. @@ -44,36 +85,45 @@ def play(self, resource: Optional[str] = None, **_): if not resource: if self._player: self._player.play() - return + return self._status() - resource = self._get_resource(resource) - path = os.path.abspath(os.path.expanduser(resource)) - if os.path.exists(path): - resource = 'file://' + path + self._bus.post( + MediaPlayRequestEvent( + player='local', plugin='media.gstreamer', resource=resource + ) + ) + media = self._latest_resource = self._get_resource(resource, **kwargs) + media.open(**kwargs) + if media.resource and os.path.isfile(os.path.abspath(media.resource)): + media.resource = 'file://' + media.resource - MediaPipeline.post_event(MediaPlayRequestEvent, resource=resource) - pipeline = self._allocate_pipeline(resource) + pipeline = self._allocate_pipeline(media) pipeline.play() if self.volume: pipeline.set_volume(self.volume / 100.0) - return self.status() + return self._status() @action - def pause(self): + def pause(self, *_, **__): """Toggle the paused state""" assert self._player, 'No instance is running' self._player.pause() - return self.status() + return self._status() @action - def quit(self): + def quit(self, *_, **__): """Stop and quit the player (alias for :meth:`.stop`)""" - self._stop_torrent() - assert self._player, 'No instance is running' + if self._latest_resource: + self._latest_resource.close() + self._latest_resource = None + + if self._player: + self._player.stop() + self._player = None + else: + self.logger.info('No instance is running') - self._player.stop() - self._player = None return {'state': PlayerState.STOP.value} @action @@ -84,14 +134,12 @@ def stop(self): @action def voldown(self, step=10.0): """Volume down by (default: 10)%""" - # noinspection PyUnresolvedReferences - return self.set_volume(self.get_volume().output - step) + return self.set_volume(self._get_volume() - step) @action def volup(self, step=10.0): """Volume up by (default: 10)%""" - # noinspection PyUnresolvedReferences - return self.set_volume(self.get_volume().output + step) + return self.set_volume(self._get_volume() + step) @action def get_volume(self) -> float: @@ -111,11 +159,10 @@ def set_volume(self, volume): :param volume: Volume value between 0 and 100. """ assert self._player, 'Player not running' - # noinspection PyTypeChecker volume = max(0, min(1, volume / 100.0)) self._player.set_volume(volume) - MediaPipeline.post_event(MediaVolumeChangedEvent, volume=volume * 100) - return self.status() + self._player.post_event(MediaVolumeChangedEvent, volume=volume * 100) + return self._status() @action def seek(self, position: float) -> dict: @@ -126,7 +173,8 @@ def seek(self, position: float) -> dict: """ assert self._player, 'No instance is running' cur_pos = self._player.get_position() - return self.set_position(cur_pos + position) + # return self._set_position(cur_pos + position) + return self._set_position((cur_pos or 0) + float(position)) @action def back(self, offset=60.0): @@ -174,60 +222,25 @@ def set_position(self, position: float) -> dict: """ assert self._player, 'No instance is running' self._player.seek(position) - return self.status() + return self._status() @action def status(self) -> dict: """ Get the current player state. """ - if not self._player: - return {'state': PlayerState.STOP.value} - - pos = self._player.get_position() - length = self._player.get_duration() - - status = { - 'duration': length, - 'filename': self._resource[7:] - if self._resource.startswith('file://') - else self._resource, - 'mute': self._player.is_muted(), - 'name': self._resource, - 'pause': self._player.is_paused(), - 'percent_pos': pos / length - if pos is not None and length is not None and pos >= 0 and length > 0 - else 0, - 'position': pos, - 'seekable': length is not None and length > 0, - 'state': self._gst_to_player_state(self._player.get_state()).value, - 'url': self._resource, - 'volume': self._player.get_volume() * 100, - } - - if self._latest_resource: - status.update( - { - k: v - for k, v in asdict(self._latest_resource).items() - if v is not None - } - ) - - return status + return self._status() @staticmethod def _gst_to_player_state(state) -> PlayerState: - # noinspection PyUnresolvedReferences,PyPackageRequirements - from gi.repository import Gst + from gi.repository import Gst # type: ignore - if state == Gst.State.READY: - return PlayerState.STOP if state == Gst.State.PAUSED: return PlayerState.PAUSE if state == Gst.State.PLAYING: return PlayerState.PLAY - return PlayerState.IDLE + + return PlayerState.STOP def toggle_subtitles(self, *_, **__): raise NotImplementedError diff --git a/platypush/plugins/media/gstreamer/model.py b/platypush/plugins/media/gstreamer/model.py index 9c1341e6c..95c002df8 100644 --- a/platypush/plugins/media/gstreamer/model.py +++ b/platypush/plugins/media/gstreamer/model.py @@ -2,41 +2,64 @@ from platypush.common.gstreamer import Pipeline from platypush.context import get_bus -from platypush.message.event.media import MediaEvent, MediaPlayEvent, MediaPauseEvent, MediaStopEvent, \ - NewPlayingMediaEvent, MediaMuteChangedEvent, MediaSeekEvent +from platypush.message.event.media import ( + MediaEvent, + MediaPlayEvent, + MediaPauseEvent, + MediaStopEvent, + NewPlayingMediaEvent, + MediaMuteChangedEvent, + MediaSeekEvent, +) +from platypush.plugins.media import MediaResource class MediaPipeline(Pipeline): - def __init__(self, resource: str): + def __init__(self, resource: MediaResource): super().__init__() + self.resource = resource - self.add_source('playbin', uri=resource) + self._first_play = True + if resource.resource and resource.fd is None: + self.add_source('playbin', uri=resource.resource) + elif resource.fd is not None: + self.add_source('playbin', uri=f'fd://{resource.fd.fileno()}') + else: + raise AssertionError('No resource specified') - @staticmethod - def post_event(evt_class: Type[MediaEvent], **kwargs): + def post_event(self, evt_class: Type[MediaEvent], **kwargs): kwargs['player'] = 'local' kwargs['plugin'] = 'media.gstreamer' + + if self.resource: + resource_args = self.resource.to_dict() + resource_args.pop('type', None) + kwargs.update(resource_args) + evt = evt_class(**kwargs) get_bus().post(evt) + def on_play(self): + super().on_play() + if self._first_play: + self.post_event(NewPlayingMediaEvent, resource=self.resource) + self._first_play = False + + self.post_event(MediaPlayEvent) + + def on_pause(self): + super().on_pause() + self.post_event(MediaPauseEvent) + def play(self): - # noinspection PyUnresolvedReferences,PyPackageRequirements from gi.repository import Gst - is_first_play = self.get_state() == Gst.State.NULL + self._first_play = self.get_state() == Gst.State.NULL super().play() - if is_first_play: - self.post_event(NewPlayingMediaEvent, resource=self.resource) - self.post_event(MediaPlayEvent, resource=self.resource) - - def pause(self): - # noinspection PyUnresolvedReferences,PyPackageRequirements - from gi.repository import Gst - super().pause() - self.post_event(MediaPauseEvent if self.get_state() == Gst.State.PAUSED else MediaPlayEvent) def stop(self): super().stop() + self._first_play = True self.post_event(MediaStopEvent) def mute(self): @@ -48,7 +71,20 @@ def unmute(self): self.post_event(MediaMuteChangedEvent, mute=self.is_muted()) def seek(self, position: float): - super().seek(position) + from gi.repository import Gst + + if not self.source: + self.logger.info('Cannot seek on a pipeline without a source') + return + + position = max(0, position) + duration = self.get_duration() + if duration and position > duration: + position = duration + + cur_pos = self.get_position() or 0 + seek_ns = int((position - cur_pos) * 1e9) + self.source.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, seek_ns) self.post_event(MediaSeekEvent, position=self.get_position()) diff --git a/platypush/plugins/media/kodi/__init__.py b/platypush/plugins/media/kodi/__init__.py index c5638bd6d..e5273c024 100644 --- a/platypush/plugins/media/kodi/__init__.py +++ b/platypush/plugins/media/kodi/__init__.py @@ -1,6 +1,8 @@ import json import threading import time +from typing import Optional +from urllib.parse import urlparse from platypush.context import get_bus from platypush.plugins import action @@ -19,42 +21,36 @@ class MediaKodiPlugin(MediaPlugin): Plugin to interact with a Kodi media player instance """ - # noinspection HttpUrlsUsage def __init__( self, - host, - http_port=8080, - websocket_port=9090, - username=None, - password=None, + rpc_url: str = 'http://localhost:8080/jsonrpc', + websocket_port: int = 9090, + username: Optional[str] = None, + password: Optional[str] = None, **kwargs, ): """ - :param host: Kodi host name or IP - :type host: str - - :param http_port: Kodi JSON RPC web port. Remember to enable "Allow remote control via HTTP" - in Kodi service settings -> advanced configuration and "Allow remote control from applications" - on this system and, optionally, on other systems if the Kodi server is on another machine - :type http_port: int - + :param rpc_url: Base URL for the Kodi JSON RPC API (default: + http://localhost:8080/jsonrpc). You need to make sure that the RPC + API is enabled on your Kodi instance - you can enable it from the + settings. :param websocket_port: Kodi JSON RPC websocket port, used to receive player events - :type websocket_port: int - :param username: Kodi username (optional) - :type username: str - :param password: Kodi password (optional) - :type password: str """ super().__init__(**kwargs) - self.host = host - self.http_port = http_port + self.url = rpc_url + host, port = kwargs.get('host'), kwargs.get('port', 8080) + + if host and port: + self.logger.warning('host and port are deprecated, use rpc_url instead') + self.url = f'http://{host}:{port}/jsonrpc' + + self.host = urlparse(self.url).hostname self.websocket_port = websocket_port - self.url = 'http://{}:{}/jsonrpc'.format(host, http_port) - self.websocket_url = 'ws://{}:{}/jsonrpc'.format(host, websocket_port) + self.websocket_url = f'ws://{self.host}:{websocket_port}/jsonrpc' self.username = username self.password = password self._ws = None @@ -113,7 +109,7 @@ def _post_event(self, evt_type, **evt): def _on_ws_msg(self): def hndl(*args): msg = args[1] if len(args) > 1 else args[0] - self.logger.info("Received Kodi message: {}".format(msg)) + self.logger.info("Received Kodi message: %s", msg) msg = json.loads(msg) method = msg.get('method') @@ -138,6 +134,7 @@ def hndl(*args): elif method == 'Player.OnStop': player = msg.get('params', {}).get('data', {}).get('player', {}) self._post_event(MediaStopEvent, player_id=player.get('playerid')) + self._clear_resource() elif method == 'Player.OnSeek': player = msg.get('params', {}).get('data', {}).get('player', {}) position = self._time_obj_to_pos(player.get('seekoffset')) @@ -153,7 +150,7 @@ def hndl(*args): def _on_ws_error(self): def hndl(*args): error = args[1] if len(args) > 1 else args[0] - self.logger.warning("Kodi websocket connection error: {}".format(error)) + self.logger.warning("Kodi websocket connection error: %s", error) return hndl @@ -171,25 +168,28 @@ def _build_result(self, result): status['result'] = result.get('result') return status, result.get('error') + def _clear_resource(self): + if self._latest_resource: + self._latest_resource.close() + self._latest_resource = None + @action - def play(self, resource, *args, **kwargs): + def play(self, resource: str, **kwargs): """ Open and play the specified file or URL :param resource: URL or path to the media to be played """ - - if resource.startswith('file://'): - resource = resource[7:] - - result = self._get_kodi().Player.Open(item={'file': resource}) + media = self._latest_resource = self._get_resource(resource, **kwargs) + media.open(**kwargs) + result = self._get_kodi().Player.Open(item={'file': media.resource}) if self.volume: self.set_volume(volume=int(self.volume)) return self._build_result(result) @action - def pause(self, player_id=None, *args, **kwargs): + def pause(self, player_id=None, **_): """ Play/pause the current media """ @@ -212,7 +212,7 @@ def get_active_players(self): return result.get('result'), result.get('error') @action - def get_movies(self, *args, **kwargs): + def get_movies(self, **_): """ Get the list of movies on the Kodi server """ @@ -221,7 +221,7 @@ def get_movies(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def stop(self, player_id=None, *args, **kwargs): + def stop(self, player_id=None, **_): """ Stop the current media """ @@ -232,10 +232,11 @@ def stop(self, player_id=None, *args, **kwargs): return None, 'No active players found' result = self._get_kodi().Player.Stop(playerid=player_id) + self._clear_resource() return self._build_result(result) @action - def notify(self, title, message, *args, **kwargs): + def notify(self, title, message, **_): """ Send a notification to the Kodi UI """ @@ -244,7 +245,7 @@ def notify(self, title, message, *args, **kwargs): return result.get('result'), result.get('error') @action - def left(self, *args, **kwargs): + def left(self, **_): """ Simulate a left input event """ @@ -253,7 +254,7 @@ def left(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def right(self, *args, **kwargs): + def right(self, **_): """ Simulate a right input event """ @@ -262,7 +263,7 @@ def right(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def up(self, *args, **kwargs): + def up(self, **_): """ Simulate an up input event """ @@ -271,7 +272,7 @@ def up(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def down(self, *args, **kwargs): + def down(self, **_): """ Simulate a down input event """ @@ -280,7 +281,7 @@ def down(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def back_btn(self, *args, **kwargs): + def back_btn(self, **_): """ Simulate a back input event """ @@ -289,7 +290,7 @@ def back_btn(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def select(self, *args, **kwargs): + def select(self, **_): """ Simulate a select input event """ @@ -298,7 +299,7 @@ def select(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def send_text(self, text, *args, **kwargs): + def send_text(self, text, **_): """ Simulate a send_text input event @@ -310,13 +311,13 @@ def send_text(self, text, *args, **kwargs): return result.get('result'), result.get('error') @action - def get_volume(self, *args, **kwargs): + def get_volume(self, **_): result = self._get_kodi().Application.GetProperties(properties=['volume']) return result.get('result'), result.get('error') @action - def volup(self, step=10.0, *args, **kwargs): + def volup(self, step=10.0, **_): """Volume up (default: +10%)""" volume = ( self._get_kodi() @@ -331,7 +332,7 @@ def volup(self, step=10.0, *args, **kwargs): return self._build_result(result) @action - def voldown(self, step=10.0, *args, **kwargs): + def voldown(self, step=10.0, **_): """Volume down (default: -10%)""" volume = ( self._get_kodi() @@ -346,7 +347,7 @@ def voldown(self, step=10.0, *args, **kwargs): return self._build_result(result) @action - def set_volume(self, volume, *args, **kwargs): + def set_volume(self, volume, **_): """ Set the application volume @@ -358,7 +359,7 @@ def set_volume(self, volume, *args, **kwargs): return self._build_result(result) @action - def mute(self, *args, **kwargs): + def mute(self, **_): """ Mute/unmute the application """ @@ -374,7 +375,7 @@ def mute(self, *args, **kwargs): return self._build_result(result) @action - def is_muted(self, *args, **kwargs): + def is_muted(self, **_): """ Return the muted status of the application """ @@ -383,7 +384,7 @@ def is_muted(self, *args, **kwargs): return result.get('result') @action - def scan_video_library(self, *args, **kwargs): + def scan_video_library(self, **_): """ Scan the video library """ @@ -392,7 +393,7 @@ def scan_video_library(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def scan_audio_library(self, *args, **kwargs): + def scan_audio_library(self, **_): """ Scan the audio library """ @@ -401,7 +402,7 @@ def scan_audio_library(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def clean_video_library(self, *args, **kwargs): + def clean_video_library(self, **_): """ Clean the video library """ @@ -410,7 +411,7 @@ def clean_video_library(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def clean_audio_library(self, *args, **kwargs): + def clean_audio_library(self, **_): """ Clean the audio library """ @@ -419,16 +420,17 @@ def clean_audio_library(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def quit(self, *args, **kwargs): + def quit(self, **_): """ Quit the application """ result = self._get_kodi().Application.Quit() + self._clear_resource() return result.get('result'), result.get('error') @action - def get_songs(self, *args, **kwargs): + def get_songs(self, **_): """ Get the list of songs in the audio library """ @@ -437,7 +439,7 @@ def get_songs(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def get_artists(self, *args, **kwargs): + def get_artists(self, **_): """ Get the list of artists in the audio library """ @@ -446,7 +448,7 @@ def get_artists(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def get_albums(self, *args, **kwargs): + def get_albums(self, **_): """ Get the list of albums in the audio library """ @@ -455,7 +457,7 @@ def get_albums(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def fullscreen(self, *args, **kwargs): + def fullscreen(self, **_): """ Set/unset fullscreen mode """ @@ -471,7 +473,7 @@ def fullscreen(self, *args, **kwargs): return result.get('result'), result.get('error') @action - def shuffle(self, player_id=None, shuffle=None, *args, **kwargs): + def shuffle(self, player_id=None, shuffle=None, **_): """ Set/unset shuffle mode """ @@ -495,7 +497,7 @@ def shuffle(self, player_id=None, shuffle=None, *args, **kwargs): return result.get('result'), result.get('error') @action - def repeat(self, player_id=None, repeat=None, *args, **kwargs): + def repeat(self, player_id=None, repeat=None, **_): """ Set/unset repeat mode """ @@ -543,7 +545,7 @@ def _time_obj_to_pos(t): ) @action - def seek(self, position, player_id=None, *args, **kwargs): + def seek(self, position, player_id=None, **_): """ Move to the specified time position in seconds @@ -573,7 +575,7 @@ def set_position(self, position, player_id=None, *args, **kwargs): return self.seek(*args, position=position, player_id=player_id, **kwargs) @action - def back(self, offset=30, player_id=None, *args, **kwargs): + def back(self, offset=30, player_id=None, **_): """ Move the player execution backward by delta_seconds @@ -594,11 +596,11 @@ def back(self, offset=30, player_id=None, *args, **kwargs): .get('time', {}) ) - position = self._time_obj_to_pos(position) + position = self._time_obj_to_pos(position) - offset return self.seek(player_id=player_id, position=position) @action - def forward(self, offset=30, player_id=None, *args, **kwargs): + def forward(self, offset=30, player_id=None, **_): """ Move the player execution forward by delta_seconds @@ -619,7 +621,7 @@ def forward(self, offset=30, player_id=None, *args, **kwargs): .get('time', {}) ) - position = self._time_obj_to_pos(position) + position = self._time_obj_to_pos(position) + offset return self.seek(player_id=player_id, position=position) @action @@ -715,20 +717,28 @@ def status(self, player_id=None): ) return ret - def toggle_subtitles(self, *args, **kwargs): + def toggle_subtitles(self, *_, **__): raise NotImplementedError - def set_subtitles(self, filename, *args, **kwargs): + def set_subtitles(self, *_, **__): raise NotImplementedError - def remove_subtitles(self, *args, **kwargs): + def remove_subtitles(self, *_, **__): raise NotImplementedError - def is_playing(self, *args, **kwargs): + def is_playing(self, *_, **__): raise NotImplementedError - def load(self, resource, *args, **kwargs): + def load(self, *_, **__): raise NotImplementedError + @property + def supports_local_media(self) -> bool: + return False + + @property + def supports_local_pipe(self) -> bool: + return False + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/mplayer/__init__.py b/platypush/plugins/media/mplayer/__init__.py index e5ca66e51..76946445e 100644 --- a/platypush/plugins/media/mplayer/__init__.py +++ b/platypush/plugins/media/mplayer/__init__.py @@ -4,7 +4,7 @@ import subprocess import threading import time -from dataclasses import asdict, dataclass +from dataclasses import dataclass from multiprocessing import Process, Queue, RLock from queue import Empty from typing import Any, Collection, Dict, List, Optional @@ -395,7 +395,7 @@ def play( resource: str, subtitles: Optional[str] = None, mplayer_args: Optional[List[str]] = None, - **_, + **kwargs, ): """ Play a resource. @@ -412,11 +412,11 @@ def play( if subs: mplayer_args = list(mplayer_args or []) + ['-sub', subs] - resource = self._get_resource(resource) - if resource.startswith('file://'): - resource = resource[7:] + media = self._latest_resource = self._get_resource(resource, **kwargs) + media.open(**kwargs) + self.logger.debug('Playing media: %s', media) + self._exec('loadfile', media.resource, mplayer_args=mplayer_args) - self._exec('loadfile', resource, mplayer_args=mplayer_args) if self.volume: self.set_volume(volume=self.volume) @@ -435,6 +435,10 @@ def stop(self, *_, **__): def _cleanup(self): with self._cleanup_lock: + if self._latest_resource: + self._latest_resource.close() + self._latest_resource = None + if self._player: self._player.terminate() self._player.wait() @@ -666,7 +670,7 @@ def status(self): status.update( { k: v - for k, v in asdict(self._latest_resource).items() + for k, v in self._latest_resource.to_dict().items() if v is not None } ) @@ -688,7 +692,7 @@ def _get_property( property, prefix='pausing_keep_force', wait_for_response=True, - *args, + *args, # noqa: B026 ) or {} ) @@ -757,7 +761,7 @@ def set_property( value, prefix='pausing_keep_force' if property != 'pause' else None, wait_for_response=True, - *args, + *args, # noqa: B026 ) or {} ) @@ -799,7 +803,7 @@ def step_property( value, prefix='pausing_keep_force', wait_for_response=True, - *args, + *args, # noqa: B026 ) or {} ) @@ -820,5 +824,9 @@ def set_subtitles(self, filename: str, *_, **__): self.logger.debug('set_subtitles called with filename=%s', filename) raise NotImplementedError + @property + def supports_local_pipe(self) -> bool: + return False + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/mpv/__init__.py b/platypush/plugins/media/mpv/__init__.py index c5ff2aa27..7a3cd2703 100644 --- a/platypush/plugins/media/mpv/__init__.py +++ b/platypush/plugins/media/mpv/__init__.py @@ -1,10 +1,10 @@ import os -from dataclasses import asdict from typing import Any, Dict, Optional, Type from urllib.parse import quote from platypush.plugins import action from platypush.plugins.media import PlayerState, MediaPlugin +from platypush.plugins.media._resource import MediaResource, YoutubeMediaResource from platypush.message.event.media import ( MediaEvent, MediaPlayEvent, @@ -23,7 +23,6 @@ class MediaMpvPlugin(MediaPlugin): """ _default_mpv_args = { - 'ytdl': True, 'start_event_thread': True, } @@ -49,16 +48,42 @@ def __init__( self._player = None self._latest_state = PlayerState.STOP - def _init_mpv(self, args=None): + def _init_mpv( + self, + args: Optional[dict] = None, + resource: Optional[MediaResource] = None, + youtube_format: Optional[str] = None, + youtube_audio_format: Optional[str] = None, + only_audio: bool = False, + ): import mpv - mpv_args = self.args.copy() + mpv_args: dict = {**self.args} + + if isinstance(resource, YoutubeMediaResource): + youtube_format = youtube_format or self.youtube_format + if only_audio: + youtube_format = ( + youtube_audio_format or self.youtube_audio_format or youtube_format + ) + + mpv_args.update( + { + 'ytdl': True, + 'ytdl_format': youtube_format, + 'script_opts': f'ytdl_hook-ytdl-path={self._ytdl}', + } + ) + if args: mpv_args.update(args) + mpv_args.pop('metadata', None) + for k, v in self._env.items(): os.environ[k] = v + self.logger.debug('Initializing mpv with args: %s', mpv_args) self._player = mpv.MPV(**mpv_args) self._player._event_callbacks += [self._event_callback()] @@ -159,6 +184,10 @@ def play( *_, subtitles: Optional[str] = None, fullscreen: Optional[bool] = None, + youtube_format: Optional[str] = None, + youtube_audio_format: Optional[str] = None, + only_audio: bool = False, + metadata: Optional[Dict[str, Any]] = None, **args, ): """ @@ -168,20 +197,31 @@ def play( :param subtitles: Path to optional subtitle file :param args: Extra runtime arguments that will be passed to the mpv executable as a key-value dict (keys without `--` prefix) + :param fullscreen: Override the default fullscreen setting. + :param youtube_format: Override the default youtube format setting. + :param youtube_audio_format: Override the default youtube audio format + setting. + :param only_audio: Set to True if you want to play only the audio of a + youtube video. + :param metadata: Optional metadata to attach to the resource. """ self._post_event(MediaPlayRequestEvent, resource=resource) if fullscreen is not None: args['fs'] = fullscreen - self._init_mpv(args) - - resource = self._get_resource(resource) - if resource.startswith('file://'): - resource = resource[7:] + media = self._latest_resource = self._get_resource(resource, metadata=metadata) + self._init_mpv( + args, + resource=media, + youtube_format=youtube_format, + youtube_audio_format=youtube_audio_format, + only_audio=only_audio, + ) assert self._cur_player, 'The player is not ready' - self._cur_player.play(resource) + self._cur_player.play(media.resource or media.url) + if self.volume: self.set_volume(volume=self.volume) if subtitles: @@ -498,7 +538,7 @@ def status(self, **_): status.update( { k: v - for k, v in asdict(self._latest_resource).items() + for k, v in self._latest_resource.to_dict().items() if v is not None } ) @@ -516,11 +556,9 @@ def status(self, **_): self._latest_state = self._state return status - def _get_resource(self, resource): - if self._is_youtube_resource(resource): - return resource # mpv can handle YouTube streaming natively - - return super()._get_resource(resource) + @property + def supports_local_pipe(self) -> bool: + return False # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/omxplayer/__init__.py b/platypush/plugins/media/omxplayer/__init__.py deleted file mode 100644 index e704fc389..000000000 --- a/platypush/plugins/media/omxplayer/__init__.py +++ /dev/null @@ -1,446 +0,0 @@ -from dataclasses import asdict -import enum -import threading -from typing import Collection, Optional - -import urllib.parse - -from platypush.context import get_bus -from platypush.plugins.media import MediaPlugin, PlayerState -from platypush.message.event.media import ( - MediaPlayEvent, - MediaPauseEvent, - MediaStopEvent, - MediaSeekEvent, - MediaPlayRequestEvent, -) - -from platypush.plugins import action - - -class PlayerEvent(enum.Enum): - """ - Supported player events. - """ - - STOP = 'stop' - PLAY = 'play' - PAUSE = 'pause' - - -class MediaOmxplayerPlugin(MediaPlugin): - """ - Plugin to control video and media playback using OMXPlayer. - """ - - def __init__( - self, args: Optional[Collection[str]] = None, timeout: float = 20.0, **kwargs - ): - """ - :param args: Arguments that will be passed to the OMXPlayer constructor - (e.g. subtitles, volume, start position, window size etc.) see - https://github.com/popcornmix/omxplayer#synopsis and - https://python-omxplayer-wrapper.readthedocs.io/en/latest/omxplayer/#omxplayer.player.OMXPlayer - :param timeout: How long the plugin should wait for a video to start upon play request (default: 20 seconds). - """ - - super().__init__(**kwargs) - - if args is None: - args = [] - - self.args = args - self.timeout = timeout - self._player = None - self._handlers = {e.value: [] for e in PlayerEvent} - self._play_started = threading.Event() - - @action - def play(self, *args, resource=None, subtitles=None, **_): - """ - Play or resume playing a resource. - - :param resource: Resource to play. Supported types: - - * Local files (format: ``file:///``) - * Remote videos (format: ``https:///``) - * YouTube videos (format: ``https://www.youtube.com/watch?v=``) - * Torrents (format: Magnet links, Torrent URLs or local Torrent files) - - :param subtitles: Subtitles file - """ - if not resource: - if not self._player: - self.logger.warning('No OMXPlayer instances running') - else: - self._player.play() - - return self.status() - - self._play_started.clear() - self._post_event(MediaPlayRequestEvent, resource=resource) - - if subtitles: - args += ('--subtitles', subtitles) - - resource = self._get_resource(resource) - if self._player: - try: - self._player.stop() - self._player = None - except Exception as e: - self.logger.exception(e) - self.logger.warning( - 'Unable to stop a previously running instance ' - + 'of OMXPlayer, trying to play anyway' - ) - - from dbus import DBusException - - try: - from omxplayer import OMXPlayer - - self._player = OMXPlayer(resource, args=self.args) - except DBusException as e: - self.logger.warning( - 'DBus connection failed: you will probably not ' - + 'be able to control the media' - ) - self.logger.exception(e) - - self._post_event(MediaPlayEvent, resource=resource) - self._init_player_handlers() - if not self._play_started.wait(timeout=self.timeout): - self.logger.warning( - f'The player has not sent a play started event within {self.timeout}' - ) - return self.status() - - @action - def pause(self): - """Pause the playback""" - if self._player: - self._player.play_pause() - return self.status() - - @action - def stop(self): - """Stop the playback (same as quit)""" - return self.quit() - - @action - def quit(self): - """Quit the player""" - from omxplayer.player import OMXPlayerDeadError - - if self._player: - try: - try: - self._player.stop() - except Exception as e: - self.logger.warning(f'Could not stop player: {str(e)}') - - self._player.quit() - except OMXPlayerDeadError: - pass - finally: - self._player = None - - return {'status': 'stop'} - - def get_volume(self) -> Optional[float]: - """ - :return: The player volume in percentage [0, 100]. - """ - if self._player: - return self._player.volume() * 100 - - @action - def voldown(self, step=10.0): - """ - Decrease the volume. - - :param step: Volume decrease step between 0 and 100 (default: 10%). - :type step: float - """ - if self._player: - vol = self.get_volume() - if vol is not None: - self.set_volume(max(0, vol - step)) - return self.status() - - @action - def volup(self, step=10.0): - """ - Increase the volume. - - :param step: Volume increase step between 0 and 100 (default: 10%). - :type step: float - """ - if self._player: - vol = self.get_volume() - if vol is not None: - self.set_volume(min(100, vol + step)) - return self.status() - - @action - def back(self, offset=30): - """Back by (default: 30) seconds""" - if self._player: - self._player.seek(-offset) - return self.status() - - @action - def forward(self, offset=30): - """Forward by (default: 30) seconds""" - if self._player: - self._player.seek(offset) - return self.status() - - @action - def next(self): - """Play the next track/video""" - if self._player: - self._player.stop() - - if self._videos_queue: - video = self._videos_queue.pop(0) - self.play(video) - - return self.status() - - @action - def hide_subtitles(self): - """Hide the subtitles""" - if self._player: - self._player.hide_subtitles() - return self.status() - - @action - def hide_video(self): - """Hide the video""" - if self._player: - self._player.hide_video() - return self.status() - - @action - def is_playing(self, *_, **__) -> bool: - """ - :returns: True if it's playing, False otherwise - """ - return self._player.is_playing() if self._player else False - - @action - def load(self, resource: str, *_, pause: bool = False, **__): - """ - Load a resource/video in the player. - - :param resource: URL or filename to load - :param pause: If set, load the video in paused mode (default: False) - """ - - if self._player: - self._player.load(resource, pause=pause) - return self.status() - - @action - def metadata(self): - """Get the metadata of the current video""" - if self._player: - return self._player.metadata() - return self.status() - - @action - def mute(self, *_, **__): - """Mute the player""" - if self._player: - self._player.mute() - return self.status() - - @action - def unmute(self, *_, **__): - """Unmute the player""" - if self._player: - self._player.unmute() - return self.status() - - @action - def seek(self, position: float, **__): - """ - Seek to the specified number of seconds from the start. - - :param position: Number of seconds from the start - """ - if self._player: - self._player.set_position(position) - return self.status() - - @action - def set_position(self, position: float, **__): - """ - Seek to the specified number of seconds from the start (same as :meth:`.seek`). - - :param position: Number of seconds from the start - """ - return self.seek(position) - - @action - def set_volume(self, volume: float, *_, **__): - """ - Set the volume - - :param volume: Volume value between 0 and 100 - """ - - if self._player: - self._player.set_volume(volume / 100) - return self.status() - - @action - def status(self): - """ - Get the current player state. - - :returns: A dictionary containing the current state. - - Format:: - - output = { - "duration": Duration in seconds, - "filename": Media filename, - "fullscreen": true or false, - "mute": true or false, - "path": Media path - "pause": true or false, - "position": Position in seconds - "seekable": true or false - "state": play, pause or stop - "title": Media title - "url": Media url - "volume": Volume between 0 and 100 - "volume_max": 100, - } - """ - - from omxplayer.player import OMXPlayerDeadError - from dbus import DBusException - - if not self._player: - return {'state': PlayerState.STOP.value} - - try: - state = self._player.playback_status().lower() - except (OMXPlayerDeadError, DBusException) as e: - self.logger.warning('Could not retrieve player status: %s', e) - if isinstance(e, OMXPlayerDeadError): - self._player = None - - return {'state': PlayerState.STOP.value} - - if state == 'playing': - state = PlayerState.PLAY.value - elif state == 'stopped': - state = PlayerState.STOP.value - elif state == 'paused': - state = PlayerState.PAUSE.value - - status = { - 'duration': self._player.duration(), - 'filename': urllib.parse.unquote(self._player.get_source()).split('/')[-1] - if self._player.get_source().startswith('file://') - else None, - 'fullscreen': self._player.fullscreen(), - 'mute': self._player._is_muted, - 'path': self._player.get_source(), - 'pause': state == PlayerState.PAUSE.value, - 'position': max(0, self._player.position()), - 'seekable': self._player.can_seek(), - 'state': state, - 'title': urllib.parse.unquote(self._player.get_source()).split('/')[-1] - if self._player.get_source().startswith('file://') - else None, - 'url': self._player.get_source(), - 'volume': self.get_volume(), - 'volume_max': 100, - } - - if self._latest_resource: - status.update( - { - k: v - for k, v in asdict(self._latest_resource).items() - if v is not None - } - ) - - return status - - def add_handler(self, event_type, callback): - if event_type not in self._handlers.keys(): - raise AttributeError(f'{event_type} is not a valid PlayerEvent type') - - self._handlers[event_type].append(callback) - - @staticmethod - def _post_event(evt_type, **evt): - bus = get_bus() - bus.post(evt_type(player='local', plugin='media.omxplayer', **evt)) - - def on_play(self): - def _f(player): - if self.volume and not self._play_started.is_set(): - self.set_volume(self.volume) - self._play_started.set() - - resource = player.get_source() - self._post_event(MediaPlayEvent, resource=resource) - for callback in self._handlers[PlayerEvent.PLAY.value]: - callback(resource) - - return _f - - def on_pause(self): - def _f(player): - resource = player.get_source() - self._post_event(MediaPauseEvent, resource=resource) - for callback in self._handlers[PlayerEvent.PAUSE.value]: - callback(resource) - - return _f - - def on_stop(self): - def _f(*_, **__): - self._post_event(MediaStopEvent) - for callback in self._handlers[PlayerEvent.STOP.value]: - callback() - - return _f - - def on_seek(self): - def _f(player, *_, **__): - self._post_event(MediaSeekEvent, position=player.position()) - - return _f - - def _init_player_handlers(self): - if not self._player: - return - - self._player.playEvent += self.on_play() - self._player.pauseEvent += self.on_pause() - self._player.stopEvent += self.on_stop() - self._player.exitEvent += self.on_stop() - self._player.positionEvent += self.on_seek() - self._player.seekEvent += self.on_seek() - - def toggle_subtitles(self, *_, **__): - raise NotImplementedError - - def set_subtitles(self, *_, **__): - raise NotImplementedError - - def remove_subtitles(self, *_, **__): - raise NotImplementedError - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/omxplayer/manifest.json b/platypush/plugins/media/omxplayer/manifest.json deleted file mode 100644 index 0f112e183..000000000 --- a/platypush/plugins/media/omxplayer/manifest.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "manifest": { - "events": {}, - "install": { - "pip": [ - "omxplayer-wrapper", - "yt-dlp" - ], - "apt": [ - "omxplayer", - "yt-dlp" - ] - }, - "package": "platypush.plugins.media.omxplayer", - "type": "plugin" - } -} \ No newline at end of file diff --git a/platypush/plugins/media/search/__init__.py b/platypush/plugins/media/search/__init__.py deleted file mode 100644 index 0d5330f33..000000000 --- a/platypush/plugins/media/search/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging -from typing import Optional - -from .. import MediaPlugin - - -class MediaSearcher: - """ - Base class for media searchers - """ - - def __init__(self, *args, media_plugin: Optional[MediaPlugin] = None, **kwargs): - self.logger = logging.getLogger(self.__class__.__name__) - self.media_plugin = media_plugin - - def search(self, query, *args, **kwargs): - raise NotImplementedError('The search method should be implemented ' + - 'by a derived class') - - -from .local import LocalMediaSearcher -from .youtube import YoutubeMediaSearcher -from .torrent import TorrentMediaSearcher -from .plex import PlexMediaSearcher -from .jellyfin import JellyfinMediaSearcher - -__all__ = [ - 'MediaSearcher', 'LocalMediaSearcher', 'TorrentMediaSearcher', - 'YoutubeMediaSearcher', 'PlexMediaSearcher', 'JellyfinMediaSearcher', -] - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/plex.py b/platypush/plugins/media/search/plex.py deleted file mode 100644 index 9f09869e2..000000000 --- a/platypush/plugins/media/search/plex.py +++ /dev/null @@ -1,61 +0,0 @@ -from platypush.context import get_plugin -from platypush.plugins.media.search import MediaSearcher - - -class PlexMediaSearcher(MediaSearcher): - def search(self, query, **kwargs): - """ - Performs a Plex search using the configured :class:`platypush.plugins.media.plex.MediaPlexPlugin` instance if - it is available. - """ - - try: - plex = get_plugin('media.plex') - except RuntimeError: - return [] - - self.logger.info('Searching Plex for "{}"'.format(query)) - results = [] - - for result in plex.search(title=query).output: - results.extend(self._flatten_result(result)) - - self.logger.info('{} Plex results found for the search query "{}"'.format(len(results), query)) - return results - - @staticmethod - def _flatten_result(result): - def parse_part(media, part, episode=None, sub_media=None): - if 'episodes' in media: - del media['episodes'] - - return { - **{k: v for k, v in result.items() if k not in ['media', 'type']}, - 'media_type': result.get('type'), - 'type': 'plex', - **{k: v for k, v in media.items() if k not in ['parts']}, - **part, - 'title': '{}{}{}'.format( - result.get('title', ''), - ' [{}]'.format(episode['season_episode']) if (episode or {}).get('season_episode') else '', - ' {}'.format(sub_media['title']) if (sub_media or {}).get('title') else '', - ), - 'summary': episode['summary'] if (episode or {}).get('summary') else media.get('summary'), - } - - results = [] - - for media in result.get('media', []): - if 'episodes' in media: - for episode in media['episodes']: - for sub_media in episode.get('media', []): - for part in sub_media.get('parts', []): - results.append(parse_part(media=media, episode=episode, sub_media=sub_media, part=part)) - else: - for part in media.get('parts', []): - results.append(parse_part(media=media, part=part)) - - return results - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/vlc/__init__.py b/platypush/plugins/media/vlc/__init__.py index fecb2900d..2d0b63795 100644 --- a/platypush/plugins/media/vlc/__init__.py +++ b/platypush/plugins/media/vlc/__init__.py @@ -1,11 +1,9 @@ -from dataclasses import asdict import os import threading -import urllib.parse from typing import Collection, Optional from platypush.context import get_bus -from platypush.plugins.media import PlayerState, MediaPlugin +from platypush.plugins.media import PlayerState, MediaPlugin, MediaResource from platypush.message.event.media import ( MediaPlayEvent, MediaPlayRequestEvent, @@ -30,7 +28,7 @@ def __init__( args: Optional[Collection[str]] = None, fullscreen: bool = False, volume: int = 100, - **kwargs + **kwargs, ): """ :param args: List of extra arguments to pass to the VLC executable (e.g. @@ -52,8 +50,6 @@ def __init__( self._default_fullscreen = fullscreen self._default_volume = volume self._on_stop_callbacks = [] - self._title = None - self._filename = None self._monitor_thread: Optional[threading.Thread] = None self._on_stop_event = threading.Event() self._stop_lock = threading.RLock() @@ -86,73 +82,109 @@ def _watched_event_types(cls): if hasattr(vlc.EventType, evt) ] - def _init_vlc(self, resource): - import vlc - + def _init_vlc(self, resource: MediaResource, cache_streams: bool): if self._instance: self.logger.info('Another instance is running, waiting for it to terminate') self._on_stop_event.wait() - self._reset_state() - for k, v in self._env.items(): os.environ[k] = v self._monitor_thread = threading.Thread(target=self._player_monitor) self._monitor_thread.start() - self._instance = vlc.Instance(*self._args) - assert self._instance, 'Could not create a VLC instance' - self._player = self._instance.media_player_new(resource) + self._set_media(resource, cache_streams=cache_streams) + assert self._player, 'Could not create a VLC player instance' for evt in self._watched_event_types(): self._player.event_manager().event_attach( eventtype=evt, callback=self._event_callback() ) + def _set_media( + self, resource: MediaResource, *_, cache_streams: bool = False, **__ + ): + import vlc + + if not self._instance: + self._instance = vlc.Instance(*self._args) + + assert self._instance, 'Could not create a VLC instance' + if not self._player: + self._player = self._instance.media_player_new() + + fd = resource.fd or resource.open(cache_streams=cache_streams) + + if not cache_streams and fd is not None: + self._player.set_media(self._instance.media_new_fd(fd.fileno())) + else: + self._player.set_media(self._instance.media_new(resource.resource)) + def _player_monitor(self): self._on_stop_event.wait() self.logger.info('VLC stream terminated') - self._reset_state() + self.quit() def _reset_state(self): - with self._stop_lock: - self._latest_seek = None - self._title = None - self._filename = None - self._on_stop_event.clear() + self._latest_seek = None + self._on_stop_event.clear() - if self._player: - self.logger.info('Releasing VLC player resource') - self._player.release() - self._player = None + if self._latest_resource: + self.logger.debug('Closing latest resource') + self._latest_resource.close() + self._latest_resource = None - if self._instance: - self.logger.info('Releasing VLC instance resource') - self._instance.release() - self._instance = None + def _close_player(self): + if self._player: + self.logger.info('Releasing VLC player resource') + self._player.stop() + + if self._player.get_media(): + self.logger.debug('Releasing VLC media resource') + self._player.get_media().release() + + self.logger.debug('Releasing VLC player instance') + self._player.release() + self._player = None + + if self._instance: + self.logger.info('Releasing VLC instance resource') + self._instance.release() + self._instance = None @staticmethod def _post_event(evt_type, **evt): bus = get_bus() bus.post(evt_type(player='local', plugin='media.vlc', **evt)) + @property + def _title(self) -> Optional[str]: + if not (self._player and self._player.get_media() and self._latest_resource): + return None + + return ( + self._player.get_title() + or self._latest_resource.title + or self._latest_resource.filename + or self._player.get_media().get_mrl() + or None + ) + def _event_callback(self): def callback(event): from vlc import EventType - self.logger.debug('Received vlc event: %s', event) + self.logger.debug('Received VLC event: %s', event.type) if event.type == EventType.MediaPlayerPlaying: # type: ignore self._post_event(MediaPlayEvent, resource=self._get_current_resource()) elif event.type == EventType.MediaPlayerPaused: # type: ignore self._post_event(MediaPauseEvent) - elif ( - event.type == EventType.MediaPlayerStopped # type: ignore - or event.type == EventType.MediaPlayerEndReached # type: ignore + elif event.type in ( + EventType.MediaPlayerStopped, # type: ignore + EventType.MediaPlayerEndReached, # type: ignore ): self._on_stop_event.set() self._post_event(MediaStopEvent) - for cbk in self._on_stop_callbacks: - cbk() + self._reset_state() elif self._player and ( event.type in ( @@ -160,7 +192,6 @@ def callback(event): EventType.MediaPlayerMediaChanged, # type: ignore ) ): - self._title = self._player.get_title() or self._filename if event.type == EventType.MediaPlayerMediaChanged: # type: ignore self._post_event(NewPlayingMediaEvent, resource=self._title) elif event.type == EventType.MediaPlayerLengthChanged: # type: ignore @@ -180,6 +211,9 @@ def callback(event): self._post_event(MediaMuteChangedEvent, mute=True) elif event.type == EventType.MediaPlayerUnmuted: # type: ignore self._post_event(MediaMuteChangedEvent, mute=False) + elif event.type == EventType.MediaPlayerEncounteredError: # type: ignore + self.logger.error('VLC media player encountered an error') + self._reset_state() return callback @@ -190,6 +224,9 @@ def play( subtitles: Optional[str] = None, fullscreen: Optional[bool] = None, volume: Optional[int] = None, + cache_streams: Optional[bool] = None, + metadata: Optional[dict] = None, + **_, ): """ Play a resource. @@ -201,15 +238,22 @@ def play( `fullscreen` configured value or False) :param volume: Set to explicitly set the playback volume (default: `volume` configured value or 100) + :param cache_streams: Overrides the ``cache_streams`` configuration + value. + :param metadata: Optional metadata to pass to the resource. """ if not resource: return self.pause() self._post_event(MediaPlayRequestEvent, resource=resource) - resource = self._get_resource(resource) - self._filename = resource - self._init_vlc(resource) + cache_streams = ( + cache_streams if cache_streams is not None else self.cache_streams + ) + media = self._get_resource(resource, metadata=metadata) + self._latest_resource = media + self.quit() + self._init_vlc(media, cache_streams=cache_streams) if subtitles and self._player: if subtitles.startswith('file://'): subtitles = subtitles[len('file://') :] @@ -242,13 +286,17 @@ def quit(self, *_, **__): """Quit the player (same as `stop`)""" with self._stop_lock: if not self._player: - self.logger.warning('No vlc instance is running') + self.logger.debug('No vlc instance is running') return self.status() - self._player.stop() - self._on_stop_event.wait(timeout=5) self._reset_state() - return self.status() + self._close_player() + self._on_stop_event.wait(timeout=5) + + for cbk in self._on_stop_callbacks: + cbk() + + return self.status() @action def stop(self, *_, **__): @@ -385,7 +433,10 @@ def load(self, resource, *_, **args): """ if not self._player: return self.play(resource, **args) - self._player.set_media(resource) + + media = self._get_resource(resource) + self._reset_state() + self._set_media(media) return self.status() @action @@ -419,10 +470,10 @@ def status(self): import vlc with self._stop_lock: - if not self._player: + if not (self._player and self._latest_resource): return {'state': PlayerState.STOP.value} - status = {} + status = self._latest_resource.to_dict() vlc_state = self._player.get_state() if vlc_state == vlc.State.Playing: # type: ignore @@ -432,12 +483,6 @@ def status(self): else: status['state'] = PlayerState.STOP.value - status['url'] = ( - urllib.parse.unquote(self._player.get_media().get_mrl()) - if self._player.get_media() - else None - ) - status['position'] = ( float(self._player.get_time() / 1000) if self._player.get_time() is not None @@ -445,10 +490,13 @@ def status(self): ) media = self._player.get_media() - status['duration'] = ( - media.get_duration() / 1000 - if media and media.get_duration() is not None - else None + status['duration'] = status.get( + 'duration', + ( + media.get_duration() / 1000 + if media and media.get_duration() is not None + else None + ), ) status['seekable'] = status['duration'] is not None @@ -457,23 +505,10 @@ def status(self): status['path'] = status['url'] status['pause'] = status['state'] == PlayerState.PAUSE.value status['percent_pos'] = self._player.get_position() * 100 - status['filename'] = self._filename + status['filename'] = self._latest_resource.filename status['title'] = self._title status['volume'] = self._player.audio_get_volume() status['volume_max'] = 100 - - if ( - status['state'] in (PlayerState.PLAY.value, PlayerState.PAUSE.value) - and self._latest_resource - ): - status.update( - { - k: v - for k, v in asdict(self._latest_resource).items() - if v is not None - } - ) - return status def on_stop(self, callback): @@ -482,6 +517,10 @@ def on_stop(self, callback): def _get_current_resource(self): if not self._player or not self._player.get_media(): return None + + if self._latest_resource: + return self._latest_resource.url + return self._player.get_media().get_mrl()