Skip to content

Commit 0986639

Browse files
committed
add kombu.transport.django
1 parent a0175b0 commit 0986639

File tree

11 files changed

+459
-1
lines changed

11 files changed

+459
-1
lines changed

docs/includes/introduction.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ Transport Comparison
9393
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
9494
| *Pyro* | Virtual | Yes | Yes [#f1]_ | No | No | No |
9595
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
96+
| *Django* | Virtual | Yes | Yes | Yes | Yes | Yes |
97+
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
9698

9799

98100
.. [#f1] Declarations only kept in memory, so exchanges/queues
@@ -264,4 +266,3 @@ There are some concepts you should be familiar with before starting:
264266
zero or more words. For example `"*.stock.#"` matches the
265267
routing keys `"usd.stock"` and `"eur.stock.db"` but not
266268
`"stock.nasdaq"`.
267-

docs/userguide/connections.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ All of these are valid URLs:
147147
# Using Pyro with name server running on 'localhost'
148148
pyro://localhost/kombu.broker
149149
150+
# Using Django
151+
django:///
152+
150153
151154
The query part of the URL can also be used to set options, e.g.:
152155

kombu/transport/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None:
4545
'azureservicebus': 'kombu.transport.azureservicebus:Transport',
4646
'pyro': 'kombu.transport.pyro:Transport',
4747
'gcpubsub': 'kombu.transport.gcpubsub:Transport',
48+
'django': 'kombu.transport.django.transport.Transport',
4849
}
4950

5051
_transport_cache = {}

kombu/transport/django/apps.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from __future__ import annotations
2+
3+
from django.apps import AppConfig
4+
5+
6+
class KombuConfig(AppConfig):
7+
"""Django app config."""
8+
9+
default_auto_field = "django.db.models.BigAutoField"
10+
name = "kombu.transport.django"
11+
label = "kombu"
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""The initial migration for the Django transport."""
2+
3+
# Generated by Django 5.1.5 on 2025-01-20 18:26
4+
5+
from __future__ import annotations
6+
7+
import django.db.models.deletion
8+
from django.db import migrations, models
9+
10+
11+
class Migration(migrations.Migration):
12+
"""The initial migration."""
13+
14+
initial = True
15+
16+
dependencies = []
17+
18+
operations = [
19+
migrations.CreateModel(
20+
name="Exchange",
21+
fields=[
22+
(
23+
"id",
24+
models.BigAutoField(
25+
auto_created=True,
26+
primary_key=True,
27+
serialize=False,
28+
verbose_name="ID",
29+
),
30+
),
31+
("name", models.CharField(max_length=200, unique=True)),
32+
],
33+
),
34+
migrations.CreateModel(
35+
name="Queue",
36+
fields=[
37+
(
38+
"id",
39+
models.BigAutoField(
40+
auto_created=True,
41+
primary_key=True,
42+
serialize=False,
43+
verbose_name="ID",
44+
),
45+
),
46+
("name", models.CharField(max_length=200, unique=True)),
47+
],
48+
),
49+
migrations.CreateModel(
50+
name="Message",
51+
fields=[
52+
(
53+
"id",
54+
models.BigAutoField(
55+
auto_created=True,
56+
primary_key=True,
57+
serialize=False,
58+
verbose_name="ID",
59+
),
60+
),
61+
("visible", models.BooleanField(db_index=True, default=True)),
62+
(
63+
"sent_at",
64+
models.DateTimeField(auto_now_add=True, db_index=True, null=True),
65+
),
66+
("message", models.TextField()),
67+
("version", models.PositiveIntegerField(default=1)),
68+
("priority", models.PositiveIntegerField(default=0)),
69+
("ttl", models.IntegerField(blank=True, null=True)),
70+
(
71+
"queue",
72+
models.ForeignKey(
73+
on_delete=django.db.models.deletion.CASCADE,
74+
related_name="messages",
75+
to="kombu.queue",
76+
),
77+
),
78+
],
79+
),
80+
migrations.CreateModel(
81+
name="Binding",
82+
fields=[
83+
(
84+
"id",
85+
models.BigAutoField(
86+
auto_created=True,
87+
primary_key=True,
88+
serialize=False,
89+
verbose_name="ID",
90+
),
91+
),
92+
("routing_key", models.CharField(max_length=255, null=True)),
93+
(
94+
"exchange",
95+
models.ForeignKey(
96+
on_delete=django.db.models.deletion.CASCADE,
97+
related_name="bindings",
98+
to="kombu.exchange",
99+
),
100+
),
101+
(
102+
"queue",
103+
models.ForeignKey(
104+
on_delete=django.db.models.deletion.CASCADE,
105+
related_name="bindings",
106+
to="kombu.queue",
107+
),
108+
),
109+
],
110+
),
111+
]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""The migration files for the Django transport."""

kombu/transport/django/models.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from __future__ import annotations
2+
3+
from datetime import timedelta
4+
5+
from django.db import models
6+
from django.utils import timezone
7+
8+
9+
class Queue(models.Model):
10+
"""The queue."""
11+
12+
name = models.CharField(max_length=200, unique=True)
13+
14+
def __str__(self):
15+
return self.name
16+
17+
18+
class Message(models.Model):
19+
"""The message."""
20+
21+
visible = models.BooleanField(default=True, db_index=True)
22+
sent_at = models.DateTimeField(null=True, db_index=True, auto_now_add=True)
23+
message = models.TextField()
24+
version = models.PositiveIntegerField(default=1)
25+
priority = models.PositiveIntegerField(default=0)
26+
ttl = models.IntegerField(
27+
null=True, blank=True
28+
) # TTL in seconds (null means no TTL)
29+
queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="messages")
30+
31+
def __str__(self):
32+
return f"{self.sent_at} {self.message} {self.queue_id}"
33+
34+
def is_expired(self):
35+
if self.ttl is None:
36+
return False # No TTL set, so not expired
37+
expiration_time = self.sent_at + timedelta(seconds=self.ttl)
38+
return expiration_time < timezone.now()
39+
40+
41+
class Exchange(models.Model):
42+
"""The exchange."""
43+
44+
name = models.CharField(max_length=200, unique=True)
45+
46+
def __str__(self):
47+
return f"{self.name}"
48+
49+
50+
class Binding(models.Model):
51+
"""The binding."""
52+
53+
queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="bindings")
54+
exchange = models.ForeignKey(
55+
Exchange, on_delete=models.CASCADE, related_name="bindings"
56+
)
57+
routing_key = models.CharField(max_length=255, null=True)
58+
59+
def __str__(self):
60+
return f"Binding: {self.queue.name} -> {self.exchange.name} with routing_key {self.routing_key}"
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
"""Django Transport module for kombu.
2+
3+
Kombu transport using Django ORM as the message store.
4+
5+
Features
6+
========
7+
* Type: Virtual
8+
* Supports Direct: Yes
9+
* Supports Topic: Yes
10+
* Supports Fanout: Yes
11+
* Supports Priority: Yes
12+
* Supports TTL: Yes
13+
14+
Connection String
15+
=================
16+
17+
.. code-block::
18+
19+
django:///
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import json
25+
import logging
26+
from queue import Empty
27+
28+
from django.db import transaction
29+
30+
from kombu.transport import virtual
31+
from kombu.transport.django.models import Binding, Exchange, Message, Queue
32+
33+
VERSION = (0, 0, 1)
34+
__version__ = ".".join(map(str, VERSION))
35+
36+
logger = logging.getLogger(__name__)
37+
38+
39+
class Channel(virtual.Channel):
40+
"""The channel class."""
41+
42+
supports_fanout = True
43+
44+
def _open(self):
45+
pass
46+
47+
def _put(self, queue, message, priority=0, ttl=None, **kwargs):
48+
queue_instance, _ = Queue.objects.get_or_create(name=queue)
49+
queue_instance.messages.create(
50+
message=json.dumps(message), priority=priority, ttl=ttl
51+
)
52+
53+
def _get(self, queue, timeout=None):
54+
with transaction.atomic():
55+
try:
56+
queue_instance = Queue.objects.get(name=queue)
57+
except Queue.DoesNotExist:
58+
raise Empty()
59+
message_instance = (
60+
Message.objects.select_for_update(skip_locked=True)
61+
.filter(visible=True, queue=queue_instance)
62+
.order_by("priority", "sent_at", "id")
63+
.first()
64+
)
65+
if message_instance is not None:
66+
if message_instance.is_expired():
67+
message_instance.visible = False
68+
message_instance.save(update_fields=["visible"])
69+
logger.debug(
70+
f"Message with ID {message_instance.id} has expired and is discarded."
71+
)
72+
return self._get(queue, timeout=timeout)
73+
74+
message_instance.visible = False
75+
message_instance.save(update_fields=["visible"])
76+
msg = message_instance.message
77+
return json.loads(msg)
78+
raise Empty()
79+
80+
def _purge(self, queue):
81+
try:
82+
queue_instance = Queue.objects.get(name=queue)
83+
except Queue.DoesNotExist:
84+
return
85+
queue_instance.messages.all().delete()
86+
87+
def _queue_bind(self, exchange, routing_key, pattern, queue):
88+
queue_instance, _ = Queue.objects.get_or_create(name=queue)
89+
exchange_instance, _ = Exchange.objects.get_or_create(name=exchange)
90+
binding, created = Binding.objects.get_or_create(
91+
queue=queue_instance,
92+
exchange=exchange_instance,
93+
routing_key=routing_key,
94+
)
95+
if created:
96+
logger.debug(f"Binding created: {binding}")
97+
else:
98+
logger.debug(f"Binding already exists: {binding}")
99+
100+
def _put_fanout(self, exchange, message, routing_key, priority=0, **kwargs):
101+
try:
102+
exchange_instance = Exchange.objects.get(name=exchange)
103+
except Exchange.DoesNotExist:
104+
return
105+
queues = Queue.objects.filter(
106+
bindings__exchange=exchange_instance, bindings__routing_key=routing_key
107+
)
108+
logger.debug(
109+
f"Found {len(queues)} queues bound to fanout exchange {exchange_instance.name}"
110+
)
111+
for queue in queues:
112+
# Publish the message to each bound queue
113+
logger.debug(f"Publishing message to fanout queue: {queue.name}")
114+
self._put(queue.name, message, priority=priority)
115+
116+
def get_table(self, exchange):
117+
try:
118+
exchange_instance = Exchange.objects.get(name=exchange)
119+
except Exchange.DoesNotExist:
120+
return []
121+
bindings = exchange_instance.bindings.all()
122+
return [(binding.routing_key, "", binding.queue.name) for binding in bindings]
123+
124+
125+
class Transport(virtual.Transport):
126+
"""The transport class."""
127+
128+
Channel = Channel
129+
130+
can_parse_url = True
131+
driver_type = "django"
132+
driver_name = "django"
133+
134+
implements = virtual.Transport.implements.extend(
135+
exchange_type=frozenset(["direct", "topic", "fanout"])
136+
)

requirements/extras/django.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
django>=4.2.18

requirements/test-ci.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ urllib3>=1.26.16; sys_platform != 'win32'
1717
-r extras/sqlalchemy.txt
1818
-r extras/etcd.txt
1919
-r extras/gcpubsub.txt
20+
-r extras/django.txt

0 commit comments

Comments
 (0)