Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10']
python-version: ['3.8', '3.9', '3.10']
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Extra requirements

- `aiohttp <https://aiohttp.readthedocs.io>`_
- `aio_pika <https://aio-pika.readthedocs.io>`_
- `asyncio-paho <https://github.com/toreamun/asyncio-paho>`_
- `flask <https://flask.palletsprojects.com>`_
- `jsonschema <https://python-jsonschema.readthedocs.io>`_
- `kombu <https://kombu.readthedocs.io/en/stable/>`_
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Extra requirements

- `aiohttp <https://aiohttp.readthedocs.io>`_
- `aio_pika <https://aio-pika.readthedocs.io>`_
- `asyncio-paho <https://github.com/toreamun/asyncio-paho>`_
- `flask <https://flask.palletsprojects.com>`_
- `jsonschema <https://python-jsonschema.readthedocs.io>`_
- `kombu <https://kombu.readthedocs.io/en/stable/>`_
Expand Down
7 changes: 7 additions & 0 deletions docs/source/pjrpc/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Examples
========


aio_paho client
---------------

.. literalinclude:: ../../../examples/aio_paho_client.py
:language: python


aio_pika client
---------------

Expand Down
75 changes: 75 additions & 0 deletions examples/aio_paho_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
import logging
import ssl
from os import environ
from typing import Tuple

from asyncio_paho import AsyncioPahoClient

import pjrpc
from pjrpc.client.backend import aio_paho


def get_broker() -> Tuple[str, str, int]:
broker = environ.get("MQTT_BROKER")
assert broker
try:
scheme_idx = broker.index("://")
transport = broker[:scheme_idx]
broker = broker[scheme_idx + 3:]
print(transport)
if transport == "wss":
transport = "websockets"
except ValueError:
transport = "tcp"
try:
port = int(broker[broker.index(":") + 1:])
broker = broker[: broker.index(":")]
except ValueError:
port = 1883
print(transport, broker, port)
return transport, broker, port


async def main() -> None:
rpc = aio_paho.Client(debug=True)
transport, broker, port = get_broker()
rpc.client = AsyncioPahoClient(
transport=transport,
client_id=environ.get("MQTT_CLIENTID", ""),
)
if port == 443:
rpc.client.tls_set("pki.autonoma.cloud-rootca.cer")
elif port == 8883:
rpc.client.tls_set(cert_reqs=ssl.CERT_NONE)
# To disable verification of the server hostname in the server certificate:
# rpc._client.tls_insecure_set(True)

username = environ.get("MQTT_USERNAME", "")
password = environ.get("MQTT_PASSWORD", "")
rpc.client.username_pw_set(username, password)

request_topic = environ.get("MQTT_RPC_REQUEST_TOPIC", "")
response_topic = environ.get("MQTT_RPC_RESPONSE_TOPIC", "")
rpc.topics(request_topic=request_topic, response_topic=response_topic)

await rpc.connect(broker, port)

await rpc.notify('schedule_restart')
await asyncio.sleep(1)
response = await rpc.send(pjrpc.Request('get_methods', params=None, id=1))
assert response
print(response.result)

result = await rpc('get_methods')
print(result)

result = await rpc.proxy.get_methods()
print(result)

await rpc.notify('schedule_shutdown')


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
81 changes: 81 additions & 0 deletions examples/aio_paho_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/python3
import asyncio
import logging
import ssl
import sys
from os import environ, execv
from typing import List

from asyncio_paho import AsyncioPahoClient

import pjrpc
from pjrpc.server.integration.aio_paho import Executor

methods = pjrpc.server.MethodRegistry()


@methods.add
def get_methods() -> List[str]:
return ["sum", "tick"]


@methods.add
def sum(a: int, b: int) -> int:
"""RPC method implementing calls to sum(1, 2) -> 3"""
return a + b


@methods.add
def tick() -> None:
"""RPC method implementing notification 'tick'"""
print("examples/aio_pika_server.py: received tick")


async def setup_mqtt_server_connection() -> Executor:
rpc = Executor(debug_messages=True)
rpc.client = AsyncioPahoClient(client_id=environ.get("DEV_MQTT_CLIENTID", ""))

username = environ.get("DEV_MQTT_USER", "")
password = environ.get("DEV_MQTT_PASSWORD", "")
rpc.client.username_pw_set(username, password)

request_topic = environ.get("MQTT_RPC_REQUEST_TOPIC", "")
response_topic = environ.get("MQTT_RPC_RESPONSE_TOPIC", "")
rpc.topics(request_topic=request_topic, response_topic=response_topic)
return rpc


async def server() -> None:
handle_requests = True
rpc = await setup_mqtt_server_connection()

@methods.add
def schedule_restart() -> None:
"""Schedule a restart, allows for an ack and response delivery"""
loop = asyncio.get_event_loop()
loop.call_later(0.01, execv(__file__, sys.argv))

@methods.add
def schedule_shutdown() -> None:
"""Schedule a shutdown, allows for an orderly disconnect from the server"""
nonlocal handle_requests
handle_requests = False
loop = asyncio.get_event_loop()
loop.call_later(0.1, loop.stop)

rpc.dispatcher.add_methods(methods)
broker = environ.get("MQTT_SSL_BROKER")
assert broker
# Connect to broker using mqtts (mqtt+tls) on port 8883:
rpc.client.tls_set(cert_reqs=ssl.CERT_NONE)
# To disable verification of the server hostname in the server certificate:
# rpc.client.tls_insecure_set(True)
await rpc.connect(broker, port=8883)
while handle_requests:
await rpc.handle_messages()
await rpc.disconnect()


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(server())
140 changes: 140 additions & 0 deletions pjrpc/client/backend/aio_paho.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Based on pjrpc/client/backend/aio_pika.py (but much simpler due to MQTT),
this module implements the JSON-RPC backend for JSON-RPC over MQTT brokers"""
import asyncio
import logging
from typing import Any, List, Optional

import paho.mqtt.client as paho # type: ignore
from asyncio_paho import AsyncioPahoClient # type: ignore

from pjrpc.client import AbstractAsyncClient

debug = logging.getLogger(__package__).debug


class Client(AbstractAsyncClient):
client: AsyncioPahoClient
"""
JSON-RPC client based on `asyncio-mqtt <https://github.com/sbtinstruments/asyncio-mqtt/>`_

:param debug: Whether to enable debugging for the paho mqtt backend
:param kwargs: parameters to be passed to :py:class:`pjrpc.client.AbstractClient`
"""

def __init__(self, **kwargs: Any):
self._debug = kwargs.pop("debug", False)
super().__init__(**kwargs)

def topics(
self,
request_topic: str,
response_topic: str,
**kwargs: Any,
) -> None:
"""Defines the topics for publishing and subscribing at the broker."""
self._request_topic = request_topic

subscribe_result: tuple[int, int] = (-1, -1)
self._subscribed_future: asyncio.Future[str] = asyncio.Future()
self._rpc_futures: List[asyncio.Future[str]] = []
if "debug" in kwargs:
self._debug = kwargs.pop("debug")

def on_connect(
client: paho.Client,
userdata: Any,
flags_dict: dict[str, Any],
result: int,
) -> None:
# pylint: disable=unused-argument
nonlocal subscribe_result
if self._debug:
debug(f"aio_paho: Connected, subscribe to: {response_topic}")
subscribe_result = client.subscribe(response_topic)
assert subscribe_result[0] == paho.MQTT_ERR_SUCCESS
if self._debug:
debug(f"aio_paho: Subscribed to {response_topic}")

def on_subscribe(
client: paho.Client,
userdata: Any,
mid: int,
granted_qos: tuple[int, ...],
) -> None:
# pylint: disable=unused-argument
if self._debug:
debug(f"aio_paho: Subscribed to: {response_topic}")
nonlocal subscribe_result
assert mid == subscribe_result[1]
self._subscribed_future.set_result("")

def on_message(client: paho.Client, userdt: Any, msg: paho.MQTTMessage) -> None:
# pylint: disable=unused-argument
if self._debug:
debug(f"aio_paho: Received from {msg.topic}: {str(msg.payload)}")
future = self._rpc_futures[-1]
future.set_result(msg.payload.decode())

def on_connect_fail(client: paho.Client, userdata: Any) -> None:
# pylint: disable=unused-argument
debug("aio_paho: Connect failed")

def on_log(client: paho.Client, userdata: Any, level: int, buf: Any) -> None:
# pylint: disable=unused-argument
debug(f"aio_paho: {buf}")

self.client.on_connect = on_connect
self.client.on_connect_fail = on_connect_fail
self.client.on_subscribe = on_subscribe
self.client.on_message = on_message
if self._debug:
self.client.on_log = on_log

async def connect(
self,
host: str,
port: int = 1883,
keepalive: int = 60,
bind_address: str = "",
bind_port: int = 0,
clean_start: bool | int = paho.MQTT_CLEAN_START_FIRST_ONLY,
properties: paho.Properties | None = None,
) -> None:
"""Opens a connection to the broker."""
self.client.connect_async(
host,
port,
keepalive,
bind_address,
bind_port,
clean_start,
properties,
)
await self._subscribed_future

async def close(self) -> None:
"""Close the current connection to the MQTT broker and send exceptions."""
await self.client.close()
for future in self._rpc_futures:
if future.done():
continue
future.set_exception(asyncio.CancelledError)

async def _request(
self,
request_text: str,
is_notification: bool = False,
**kwargs: Any,
) -> Optional[str]:
"""Publish an RPC request to the MQTT topic and return the received result"""
if not is_notification:
future: asyncio.Future[str] = asyncio.Future()
self._rpc_futures.append(future)
if self._debug:
debug(f"aio_paho: {self._request_topic}: publish '{request_text}'")
self.client.publish(self._request_topic, request_text.encode())
if is_notification:
return None
received = await future
self._rpc_futures.pop()
return received
Loading