Skip to content

Commit

Permalink
Use pulsectl library for PulseAudio connection
Browse files Browse the repository at this point in the history
Get rid of internal library code and use pulsectl library to communicate
with PulseAudio server.

This is a breaking change as the library uses the much more powerful
native interface instead of the CLI interface, requiring the need to
change the default port.

On the bright side, this also solves some issues with the existing
implementation:

  - There was no test if the complete list of loaded modules was
    already received. If not all data could be read at once, the
    remaining modules not yet in the buffer were considered absent,
    resulting in unreliable behavior when a lot of modules were loaded
    on the server.

  - A switch could be turned on before the list of loaded modules was
    loaded, leading to a loopback module being loaded even though this
    module was already active (#32016).
  • Loading branch information
breiti committed May 1, 2020
1 parent 8ec5e53 commit 187d9ee
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 114 deletions.
1 change: 1 addition & 0 deletions homeassistant/components/pulseaudio_loopback/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"domain": "pulseaudio_loopback",
"name": "PulseAudio Loopback",
"documentation": "https://www.home-assistant.io/integrations/pulseaudio_loopback",
"requirements": ["pulsectl==20.2.4"],
"codeowners": []
}
163 changes: 49 additions & 114 deletions homeassistant/components/pulseaudio_loopback/switch.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,32 @@
"""Switch logic for loading/unloading pulseaudio loopback modules."""
from datetime import timedelta
import logging
import re
import socket

from pulsectl import Pulse, PulseError
import voluptuous as vol

from homeassistant import util
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
import homeassistant.helpers.config_validation as cv

DOMAIN = "pulseaudio_loopback"

_LOGGER = logging.getLogger(__name__)
_PULSEAUDIO_SERVERS = {}

CONF_BUFFER_SIZE = "buffer_size"
CONF_SINK_NAME = "sink_name"
CONF_SOURCE_NAME = "source_name"
CONF_TCP_TIMEOUT = "tcp_timeout"

DEFAULT_BUFFER_SIZE = 1024
DEFAULT_HOST = "localhost"
DEFAULT_NAME = "paloopback"
DEFAULT_PORT = 4712
DEFAULT_TCP_TIMEOUT = 3
DEFAULT_PORT = 4713

IGNORED_SWITCH_WARN = "Switch is already in the desired state. Ignoring."

LOAD_CMD = "load-module module-loopback sink={0} source={1}"

MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(milliseconds=100)
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
MOD_REGEX = (
r"index: ([0-9]+)\s+name: <module-loopback>"
r"\s+argument: (?=<.*sink={0}.*>)(?=<.*source={1}.*>)"
)

UNLOAD_CMD = "unload-module {0}"

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_SINK_NAME): cv.string,
vol.Required(CONF_SOURCE_NAME): cv.string,
vol.Optional(CONF_BUFFER_SIZE, default=DEFAULT_BUFFER_SIZE): cv.positive_int,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_TCP_TIMEOUT, default=DEFAULT_TCP_TIMEOUT): cv.positive_int,
}
)

Expand All @@ -58,96 +38,61 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
source_name = config.get(CONF_SOURCE_NAME)
host = config.get(CONF_HOST)
port = config.get(CONF_PORT)
buffer_size = config.get(CONF_BUFFER_SIZE)
tcp_timeout = config.get(CONF_TCP_TIMEOUT)

hass.data.setdefault(DOMAIN, {})

server_id = str.format("{0}:{1}", host, port)

if server_id in _PULSEAUDIO_SERVERS:
server = _PULSEAUDIO_SERVERS[server_id]
if host:
connect_to_server = server_id
else:
server = PAServer(host, port, buffer_size, tcp_timeout)
_PULSEAUDIO_SERVERS[server_id] = server
connect_to_server = None

add_entities([PALoopbackSwitch(hass, name, server, sink_name, source_name)])
if server_id in hass.data[DOMAIN]:
server = hass.data[DOMAIN][server_id]
else:
server = Pulse(server=connect_to_server, connect=False, threading_lock=True)
hass.data[DOMAIN][server_id] = server

add_entities([PALoopbackSwitch(name, server, sink_name, source_name)], True)

class PAServer:
"""Representation of a Pulseaudio server."""

_current_module_state = ""
class PALoopbackSwitch(SwitchEntity):
"""Representation the presence or absence of a PA loopback module."""

def __init__(self, host, port, buff_sz, tcp_timeout):
"""Initialize PulseAudio server."""
self._pa_host = host
self._pa_port = int(port)
self._buffer_size = int(buff_sz)
self._tcp_timeout = int(tcp_timeout)
def __init__(self, name, pa_server, sink_name, source_name):
"""Initialize the Pulseaudio switch."""
self._module_idx = None
self._name = name
self._sink_name = sink_name
self._source_name = source_name
self._pa_svr = pa_server

def _send_command(self, cmd, response_expected):
"""Send a command to the pa server using a socket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self._tcp_timeout)
def _get_module_idx(self):
try:
sock.connect((self._pa_host, self._pa_port))
_LOGGER.info("Calling pulseaudio: %s", cmd)
sock.send((cmd + "\n").encode("utf-8"))
if response_expected:
return_data = self._get_full_response(sock)
_LOGGER.debug("Data received from pulseaudio: %s", return_data)
else:
return_data = ""
finally:
sock.close()
return return_data

def _get_full_response(self, sock):
"""Get the full response back from pulseaudio."""
result = ""
rcv_buffer = sock.recv(self._buffer_size)
result += rcv_buffer.decode("utf-8")

while len(rcv_buffer) == self._buffer_size:
rcv_buffer = sock.recv(self._buffer_size)
result += rcv_buffer.decode("utf-8")

return result

@util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
def update_module_state(self):
"""Refresh state in case an alternate process modified this data."""
self._current_module_state = self._send_command("list-modules", True)
self._pa_svr.connect()

def turn_on(self, sink_name, source_name):
"""Send a command to pulseaudio to turn on the loopback."""
self._send_command(str.format(LOAD_CMD, sink_name, source_name), False)
for module in self._pa_svr.module_list():
if not module.name == "module-loopback":
continue

def turn_off(self, module_idx):
"""Send a command to pulseaudio to turn off the loopback."""
self._send_command(str.format(UNLOAD_CMD, module_idx), False)
if f"sink={self._sink_name}" not in module.argument:
continue

def get_module_idx(self, sink_name, source_name):
"""For a sink/source, return its module id in our cache, if found."""
result = re.search(
str.format(MOD_REGEX, re.escape(sink_name), re.escape(source_name)),
self._current_module_state,
)
if result and result.group(1).isdigit():
return int(result.group(1))
return -1
if f"source={self._source_name}" not in module.argument:
continue

return module.index

class PALoopbackSwitch(SwitchEntity):
"""Representation the presence or absence of a PA loopback module."""
except PulseError:
return None

def __init__(self, hass, name, pa_server, sink_name, source_name):
"""Initialize the Pulseaudio switch."""
self._module_idx = -1
self._hass = hass
self._name = name
self._sink_name = sink_name
self._source_name = source_name
self._pa_svr = pa_server
return None

@property
def available(self):
"""Return true when connected to server."""
return self._pa_svr.connected

@property
def name(self):
Expand All @@ -157,35 +102,25 @@ def name(self):
@property
def is_on(self):
"""Return true if device is on."""
return self._module_idx > 0
return self._module_idx is not None

def turn_on(self, **kwargs):
"""Turn the device on."""
if not self.is_on:
self._pa_svr.turn_on(self._sink_name, self._source_name)
self._pa_svr.update_module_state(no_throttle=True)
self._module_idx = self._pa_svr.get_module_idx(
self._sink_name, self._source_name
self._pa_svr.module_load(
"module-loopback",
args=f"sink={self._sink_name} source={self._source_name}",
)
self.schedule_update_ha_state()
else:
_LOGGER.warning(IGNORED_SWITCH_WARN)

def turn_off(self, **kwargs):
"""Turn the device off."""
if self.is_on:
self._pa_svr.turn_off(self._module_idx)
self._pa_svr.update_module_state(no_throttle=True)
self._module_idx = self._pa_svr.get_module_idx(
self._sink_name, self._source_name
)
self.schedule_update_ha_state()
self._pa_svr.module_unload(self._module_idx)
else:
_LOGGER.warning(IGNORED_SWITCH_WARN)

def update(self):
"""Refresh state in case an alternate process modified this data."""
self._pa_svr.update_module_state()
self._module_idx = self._pa_svr.get_module_idx(
self._sink_name, self._source_name
)
self._module_idx = self._get_module_idx()
3 changes: 3 additions & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,9 @@ ptvsd==4.2.8
# homeassistant.components.wink
pubnubsub-handler==1.0.8

# homeassistant.components.pulseaudio_loopback
pulsectl==20.2.4

# homeassistant.components.androidtv
pure-python-adb==0.2.2.dev0

Expand Down

0 comments on commit 187d9ee

Please sign in to comment.