Skip to content

Commit

Permalink
[#422] Enabled support for yt-dlp mux+transcoding in media plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
blacklight committed Aug 18, 2024
1 parent ca5853c commit 5080caa
Show file tree
Hide file tree
Showing 49 changed files with 2,218 additions and 1,682 deletions.
6 changes: 0 additions & 6 deletions docs/source/platypush/plugins/media.omxplayer.rst

This file was deleted.

1 change: 0 additions & 1 deletion docs/source/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ Plugins
platypush/plugins/media.kodi.rst
platypush/plugins/media.mplayer.rst
platypush/plugins/media.mpv.rst
platypush/plugins/media.omxplayer.rst
platypush/plugins/media.plex.rst
platypush/plugins/media.subtitles.rst
platypush/plugins/media.vlc.rst
Expand Down
13 changes: 9 additions & 4 deletions platypush/backend/http/app/streaming/plugins/media/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ def load_media_map() -> MediaMap:
logger().warning('Could not load media map: %s', e)
return {}

return {
media_id: MediaHandler.build(**media_info)
for media_id, media_info in media_map.items()
}
parsed_map = {}
for media_id, media_info in media_map.items():
try:
parsed_map[media_id] = MediaHandler.build(**media_info)
except Exception as e:
logger().debug('Could not load media %s: %s', media_id, e)
continue

return parsed_map


def save_media_map(new_map: MediaMap):
Expand Down
4 changes: 0 additions & 4 deletions platypush/backend/http/media/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import ABC, abstractmethod
import hashlib
import logging
import os
from typing import Generator, Optional

from platypush.message import JSONAble
Expand Down Expand Up @@ -57,9 +56,6 @@ def build(cls, source: str, *args, **kwargs) -> 'MediaHandler':
logging.exception(e)
errors[hndl_class.__name__] = str(e)

if os.path.exists(source):
source = f'file://{source}'

raise AttributeError(
f'The source {source} has no handlers associated. Errors: {errors}'
)
Expand Down
3 changes: 3 additions & 0 deletions platypush/backend/http/media/handlers/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class FileHandler(MediaHandler):
prefix_handlers = ['file://']

def __init__(self, source, *args, **kwargs):
if isinstance(source, str) and os.path.exists(source):
source = f'file://{source}'

super().__init__(source, *args, **kwargs)

self.path = os.path.abspath(
Expand Down
1 change: 1 addition & 0 deletions platypush/bus/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def poll(self):

try:
data = msg.get('data', b'').decode('utf-8')
logger.debug('Received message on the Redis bus: %r', data)
parsed_msg = Message.build(data)
if parsed_msg and self.on_message:
self.on_message(parsed_msg)
Expand Down
49 changes: 47 additions & 2 deletions platypush/common/gstreamer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def __init__(self):

self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect('message::eos', self.on_eos)
self.bus.connect('message::error', self.on_error)
self.bus.connect('message', self.on_message)
self.data_ready = threading.Event()
self.data = None
self._gst_state = Gst.State.NULL

def add(self, element_name: str, *args, **props):
el = Gst.ElementFactory.make(element_name, *args)
Expand Down Expand Up @@ -94,6 +94,28 @@ def set_volume(self, volume: float):
assert self.source, 'No source initialized'
self.source.set_property('volume', volume)

def _msg_handler(self, message) -> bool:
from gi.repository import Gst # type: ignore[attr-defined]

if message.type == Gst.MessageType.EOS:
self.on_eos()
return True

if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
self.on_error(err, debug)
return True

if message.type == Gst.MessageType.STATE_CHANGED:
old_state, new_state, _ = message.parse_state_changed()[:3]
self.on_state_changed(old_state, new_state)
return True

return False # The message was not handled

def on_message(self, _, message, *__):
self._msg_handler(message)

def on_buffer(self, sink):
sample = GstApp.AppSink.pull_sample(sink)
buffer = sample.get_buffer()
Expand All @@ -106,6 +128,29 @@ def on_eos(self, *_, **__):
self.logger.info('End of stream event received')
self.stop()

def on_state_changed(self, old_state, new_state):
from gi.repository import Gst # type: ignore[attr-defined]

if (
old_state == new_state
or new_state == self._gst_state
or old_state != self._gst_state
):
return

self._gst_state = new_state

if new_state == Gst.State.PLAYING:
self.on_play()
elif new_state == Gst.State.PAUSED:
self.on_pause()

def on_play(self):
self.logger.debug('GStreamer playback started')

def on_pause(self):
self.logger.debug('GStreamer playback paused')

def on_error(self, _, msg):
self.logger.warning('GStreamer pipeline error: %s', msg.parse_error())
self.stop()
Expand Down
46 changes: 41 additions & 5 deletions platypush/message/event/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import copy
import json
import logging
import random
Expand All @@ -7,11 +6,11 @@

from dataclasses import dataclass, field
from datetime import date
from typing import Any
from typing import Any, Optional, Union

from platypush.config import Config
from platypush.message import Message
from platypush.utils import get_event_class_by_type
from platypush.utils import get_event_class_by_type, get_plugin_name_by_class

logger = logging.getLogger('platypush')

Expand Down Expand Up @@ -261,7 +260,7 @@ def as_dict(self):
"""
Converts the event into a dictionary
"""
args = copy.deepcopy(self.args)
args = dict(deepcopy(self.args))
flatten(args)
return {
'type': 'event',
Expand All @@ -277,7 +276,7 @@ def __str__(self):
Overrides the str() operator and converts
the message into a UTF-8 JSON string
"""
args = copy.deepcopy(self.args)
args = deepcopy(self.args)
flatten(args)
return json.dumps(self.as_dict(), cls=self.Encoder)

Expand All @@ -297,6 +296,43 @@ class EventMatchResult:
parsed_args: dict = field(default_factory=dict)


def deepcopy(
args: Union[dict, list], _out: Optional[Union[dict, list]] = None
) -> Union[dict, list]:
"""
Workaround implementation of deepcopy that doesn't raise exceptions
on non-pickeable objects.
"""
from platypush.plugins import Plugin

if _out is None:
_out = {} if isinstance(args, dict) else []

if isinstance(args, list):
_out = [None] * len(args)
for i, v in enumerate(args):
if isinstance(v, dict):
_out[i] = deepcopy(v)
elif isinstance(v, (list, tuple, set)):
_out[i] = deepcopy(list(v))
elif isinstance(v, Plugin):
_out[i] = get_plugin_name_by_class(v.__class__)
else:
_out[i] = v
elif isinstance(args, dict):
for k, v in args.items():
if isinstance(v, dict):
_out[k] = deepcopy(v)
elif isinstance(v, (list, tuple, set)):
_out[k] = deepcopy(list(v))
elif isinstance(v, Plugin):
_out[k] = get_plugin_name_by_class(v.__class__)
else:
_out[k] = v

return _out


def flatten(args):
"""
Flatten a nested dictionary for string serialization.
Expand Down
Loading

0 comments on commit 5080caa

Please sign in to comment.