Skip to content

feat(BA-903): Add kafka message queue #4257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 61 additions & 16 deletions python.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// "aiohttp_jinja2~=1.6",
// "aiohttp_sse>=2.2",
// "aiohttp~=3.11.16",
// "aiokafka~=0.12.0",
// "aiomonitor~=0.7.0",
// "aioresponses>=0.7.3",
// "aiosqlite~=0.21.0",
Expand Down Expand Up @@ -434,6 +435,49 @@
"requires_python": ">=3.8",
"version": "2.2.0"
},
{
"artifacts": [
{
"algorithm": "sha256",
"hash": "08c84b3894d97fd02fcc8886f394000d0f5ce771fab5c498ea2b0dd2f6b46d5b",
"url": "https://files.pythonhosted.org/packages/d9/20/69f913a76916e94c4e783dc7d0d05a25c384b25faec33e121062c62411fe/aiokafka-0.12.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl"
},
{
"algorithm": "sha256",
"hash": "f9e8ab97b935ca681a5f28cf22cf2b5112be86728876b3ec07e4ed5fc6c21f2d",
"url": "https://files.pythonhosted.org/packages/30/84/f1f7e603cd07e877520b5a1e48e006cbc1fe448806cabbaa98aa732f530d/aiokafka-0.12.0-cp313-cp313-macosx_10_13_x86_64.whl"
},
{
"algorithm": "sha256",
"hash": "62423895b866f95b5ed8d88335295a37cc5403af64cb7cb0e234f88adc2dff94",
"url": "https://files.pythonhosted.org/packages/65/ca/42a962033e6a7926dcb789168bce81d0181ef4ddabce454d830b7e62370e/aiokafka-0.12.0.tar.gz"
},
{
"algorithm": "sha256",
"hash": "2c01abf9787b1c3f3af779ad8e76d5b74903f590593bc26f33ed48750503e7f7",
"url": "https://files.pythonhosted.org/packages/6b/67/0154551292ec1c977e5def178ae5c947773e921aefb6877971e7fdf1942e/aiokafka-0.12.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl"
},
{
"algorithm": "sha256",
"hash": "ed991c120fe19fd9439f564201dd746c4839700ef270dd4c3ee6d4895f64fe83",
"url": "https://files.pythonhosted.org/packages/d7/c7/5237b3687198c2129c0bafa4a96cf8ae3883e20cc860125bafe16af3778e/aiokafka-0.12.0-cp313-cp313-macosx_11_0_arm64.whl"
}
],
"project_name": "aiokafka",
"requires_dists": [
"async-timeout",
"cramjam; extra == \"snappy\"",
"cramjam; extra == \"zstd\"",
"cramjam>=2.8.0; extra == \"all\"",
"cramjam>=2.8.0; extra == \"lz4\"",
"gssapi; extra == \"all\"",
"gssapi; extra == \"gssapi\"",
"packaging",
"typing-extensions>=4.10.0"
],
"requires_python": ">=3.9",
"version": "0.12.0"
},
{
"artifacts": [
{
Expand Down Expand Up @@ -1044,36 +1088,36 @@
"artifacts": [
{
"algorithm": "sha256",
"hash": "d125cb11e22817f7a2581bade4bf7b75247b401888890239ceb5d3e902ccaf38",
"url": "https://files.pythonhosted.org/packages/e4/5f/032d93e74949222ffbfbc3270f29a3ee423fe648de8a31c49cce0cbb0a09/boto3-1.37.37-py3-none-any.whl"
"hash": "b6d42803607148804dff82389757827a24ce9271f0583748853934c86310999f",
"url": "https://files.pythonhosted.org/packages/d3/87/8189f22ee798177bc7b40afd13f046442c5f91b699e70a950b42ff447e80/boto3-1.37.38-py3-none-any.whl"
},
{
"algorithm": "sha256",
"hash": "752d31105a45e3e01c8c68471db14ae439990b75a35e72b591ca528e2575b28f",
"url": "https://files.pythonhosted.org/packages/82/8c/2ca661db6c9e591d9dc46149b43a91385283c852436ccba62e199643e196/boto3-1.37.37.tar.gz"
"hash": "88c02910933ab7777597d1ca7c62375f52822e0aa1a8e0c51b2598a547af42b2",
"url": "https://files.pythonhosted.org/packages/0d/b5/d1c2e8c484cea43891629bbab6ca90ce9ca932586750bc0e786c8f096ccf/boto3-1.37.38.tar.gz"
}
],
"project_name": "boto3",
"requires_dists": [
"botocore<1.38.0,>=1.37.37",
"botocore<1.38.0,>=1.37.38",
"botocore[crt]<2.0a0,>=1.21.0; extra == \"crt\"",
"jmespath<2.0.0,>=0.7.1",
"s3transfer<0.12.0,>=0.11.0"
],
"requires_python": ">=3.8",
"version": "1.37.37"
"version": "1.37.38"
},
{
"artifacts": [
{
"algorithm": "sha256",
"hash": "eb730ff978f47c02f0c8ed07bccdc0db6d8fa098ed32ac31bee1da0e9be480d1",
"url": "https://files.pythonhosted.org/packages/fe/17/602915b29cb695e1e66f65e33b1026f1534e49975d99ea4e32e58d963542/botocore-1.37.37-py3-none-any.whl"
"hash": "23b4097780e156a4dcaadfc1ed156ce25cb95b6087d010c4bb7f7f5d9bc9d219",
"url": "https://files.pythonhosted.org/packages/55/1b/93f3504afc7c523dcaa8a8147cfc75421983e30b08d9f93a533929589630/botocore-1.37.38-py3-none-any.whl"
},
{
"algorithm": "sha256",
"hash": "3eadde6fed95c4cb469cc39d1c3558528b7fa76d23e7e16d4bddc77250431a64",
"url": "https://files.pythonhosted.org/packages/96/d0/70969515e3ae8ff0fcccf22827d5d131bc7b8729331127415cf8f2861d63/botocore-1.37.37.tar.gz"
"hash": "c3ea386177171f2259b284db6afc971c959ec103fa2115911c4368bea7cbbc5d",
"url": "https://files.pythonhosted.org/packages/34/79/4e072e614339727f79afef704e5993b5b4d2667c1671c757cc4deb954744/botocore-1.37.38.tar.gz"
}
],
"project_name": "botocore",
Expand All @@ -1085,7 +1129,7 @@
"urllib3<1.27,>=1.25.4; python_version < \"3.10\""
],
"requires_python": ">=3.8",
"version": "1.37.37"
"version": "1.37.38"
},
{
"artifacts": [
Expand Down Expand Up @@ -5204,21 +5248,21 @@
"artifacts": [
{
"algorithm": "sha256",
"hash": "ea47eab891afb506f470eee581dcde44d64dc99796665da794da6f83f50f6776",
"url": "https://files.pythonhosted.org/packages/7d/31/85d0264705d8ef47680d28f4dc9bb1e27d8cace785fbe3f8d009fad6cb88/types_setuptools-78.1.0.20250329-py3-none-any.whl"
"hash": "55238c0b18cdc08dd26c32d6d8385ca1ea59b93dde760dae96d15868b7911990",
"url": "https://files.pythonhosted.org/packages/08/43/410e2978d4e2f3d8355e80141a64d89616fada864403479477c1058f056d/types_setuptools-79.0.0.20250422-py3-none-any.whl"
},
{
"algorithm": "sha256",
"hash": "31e62950c38b8cc1c5114b077504e36426860a064287cac11b9666ab3a483234",
"url": "https://files.pythonhosted.org/packages/e9/6e/c54e6705e5fe67c3606e4c7c91123ecf10d7e1e6d7a9c11b52970cf2196c/types_setuptools-78.1.0.20250329.tar.gz"
"hash": "9c9f699a5914d2ed97f02ee749fb2c7bc2898f8dad03b5dd74b74d4f80e29972",
"url": "https://files.pythonhosted.org/packages/ce/61/5b764d556977ab27976f335d7493dc267b0dbcb5fae7fe117547c292c069/types_setuptools-79.0.0.20250422.tar.gz"
}
],
"project_name": "types-setuptools",
"requires_dists": [
"setuptools"
],
"requires_python": ">=3.9",
"version": "78.1.0.20250329"
"version": "79.0.0.20250422"
},
{
"artifacts": [
Expand Down Expand Up @@ -5619,6 +5663,7 @@
"aiohttp_jinja2~=1.6",
"aiohttp_sse>=2.2",
"aiohttp~=3.11.16",
"aiokafka~=0.12.0",
"aiomonitor~=0.7.0",
"aioresponses>=0.7.3",
"aiosqlite~=0.21.0",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ aiohttp~=3.11.16
aiohttp_cors~=0.8.1
aiohttp_jinja2~=1.6
aiohttp_sse>=2.2
aiokafka~=0.12.0
aiodns>=3.2
aiomonitor~=0.7.0
aioresponses>=0.7.3
Expand Down
118 changes: 118 additions & 0 deletions src/ai/backend/common/message_queue/kafka_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import asyncio
import logging
from dataclasses import dataclass
from typing import AsyncGenerator

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError

from ai.backend.common.message_queue.queue import AbstractMessageQueue, MessageId, MQMessage
from ai.backend.logging.utils import BraceStyleAdapter

log = BraceStyleAdapter(logging.getLogger(__spec__.name))


@dataclass
class KafkaMessageQueueArgs:
topic: str
producer: AIOKafkaProducer
consumer: AIOKafkaConsumer
subscriber: AIOKafkaConsumer


class KafkaMessageQueue(AbstractMessageQueue):
_topic: str
_producer: AIOKafkaProducer
_consumer: AIOKafkaConsumer
_subscriber: AIOKafkaConsumer
_consume_queue: asyncio.Queue[MQMessage]
_subscribe_queue: asyncio.Queue[MQMessage]
_closed: bool
_consumer_task: asyncio.Task
_subscriber_task: asyncio.Task

def __init__(
self,
args: KafkaMessageQueueArgs,
) -> None:
self._topic = args.topic
self._producer = args.producer
self._consumer = args.consumer
self._subscriber = args.subscriber
self._consume_queue = asyncio.Queue[MQMessage]()
self._subscribe_queue = asyncio.Queue[MQMessage]()
self._closed = False

# Start background tasks
self._consumer_task = asyncio.create_task(self._consume_messages_loop())
self._subscriber_task = asyncio.create_task(self._subscribe_messages_loop())

async def _consume_messages_loop(self) -> None:
"""Background task that continuously reads messages for consumers"""
log.debug("Starting consumer loop for topic {}", self._topic)
try:
while not self._closed:
message = await self._consumer.getone()
msg = MQMessage(
msg_id=str(message.offset).encode(),
payload=message.value,
)
await self._consume_queue.put(msg)
except Exception as e:
if not self._closed:
log.error("Error in consumer loop: {}", e)

async def _subscribe_messages_loop(self) -> None:
"""Background task that continuously reads messages for subscribers"""
log.debug("Starting subscriber loop for topic {}", self._topic)
try:
while not self._closed:
message = await self._subscriber.getone()
msg = MQMessage(
msg_id=str(message.offset).encode(),
payload=message.value,
)
await self._subscribe_queue.put(msg)
except Exception as e:
if not self._closed:
log.error("Error in subscriber loop: {}", e)

async def send(self, payload: dict[bytes, bytes]) -> None:
if self._closed:
raise RuntimeError("Queue is closed")
try:
await self._producer.send(
self._topic,
value=payload,
)
except KafkaError as e:
raise RuntimeError(f"Failed to send message: {e}")

async def consume_queue(self) -> AsyncGenerator[MQMessage, None]: # type: ignore
if self._closed:
return
while not self._closed:
try:
yield await self._consume_queue.get()
except asyncio.CancelledError:
break

async def subscribe_queue(self) -> AsyncGenerator[MQMessage, None]: # type: ignore
if self._closed:
return
while not self._closed:
try:
yield await self._subscribe_queue.get()
except asyncio.CancelledError:
break

async def done(self, msg_id: MessageId) -> None:
# Kafka handles message acknowledgment automatically
pass

async def close(self) -> None:
if self._closed:
return
self._closed = True
self._consumer_task.cancel()
self._subscriber_task.cancel()
Loading