Subscribe, act, publish.
This app is intended to streamline integration with Metro for all Django+Celery users by:
- Asynchronous handling of subscriptions and messages with one command
- Execute Celery tasks based on message topics, defined in
settings.py - Retry failed tasks through your admin dashboard when using the
MetroidTaskbase
python>= 3.10django>= 4.2 - Forasgiref, settingsdjango-guid>= 3.2.0 - Storing correlation IDs for failed tasks in the database, making debugging easy- Choose one:
celery>= 5.3.0 - Execute tasks based on a subjectdjango-rq>= 2.4.1 - Execute tasks based on a subject
The python manage.py metroid app is fully asynchronous, and has no blocking code. It utilizes Celery to execute tasks.
It works by:
- Going through all your configured subscriptions and start a new async connection for each one of them
- Metro sends messages on the subscriptions
- This app filters out messages matching subjects you have defined, and queues a celery task to execute
the function as specified for that subject
3.1. If no task is found for that subject, the message is marked as complete - The message is marked as complete after the Celery task has successfully been queued
- If the task is failed, an entry is automatically created in your database
- All failed tasks can be retried manually through the admin dashboard
Note For a complete example, have a look in
demoproj/settings.py.
- Create a
METROIDkey insettings.pywith all your subscriptions and handlers. Example settings:
METROID = {
'subscriptions': [
{
'topic_name': 'metro-demo',
'subscription_name': 'sub-metrodemo-metrodemoerfett',
'connection_string': config('CONNECTION_STRING_METRO_DEMO', None),
'handlers': [
{
'subject': 'MetroDemo/Type/GeekJokes',
'regex': False,
'handler_function': 'demoproj.demoapp.services.my_func'
}
],
},
],
'worker_type': 'celery', # default
}The handler_function is defined by providing the full dotted path as a string. For example,from demoproj.demoapp.services import my_func is provided as 'demoproj.demoapp.services.my_func'.
The handlers subject can be a regular expression or a string. If a regular expression is provided, the variable regex must be set to True. Example:
'handlers': [{'subject': r'^MetroDemo/Type/.*$','regex':True,'handler_function': my_func}],- Configure
Django-GUIDby adding the app to your installed apps, to your middlewares and configuring logging as described here. Make sure you enable theCeleryIntegration:
from django_guid.integrations import CeleryIntegration
DJANGO_GUID = {
'INTEGRATIONS': [
CeleryIntegration(
use_django_logging=True,
log_parent=True,
)
],
}Your functions will be called with keyword arguments for
message, topic_name, subscription_name and subject. You function should in other words
look something like this:
@app.task(base=MetroidTask)
def my_func(*, message: dict, topic_name: str, subscription_name: str, subject: str) -> None:def my_func(*, message: dict, topic_name: str, subscription_name: str, subject: str) -> None:- Ensure you have redis running:
docker-compose up- Run migrations
python manage.py migrate- Create an admin account
python manage.py createsuperuser- Start a worker:
celery -A demoproj worker -l info- Run the subscriber:
python manage.py metroid- Send messages to Metro. Example code can be found in
demoproj/demoapp/services.py - Run the webserver:
python manage.py runserver 8000- See failed messages under
http://localhost:8080/admin
To contribute, please see CONTRIBUTING.md
