A sample Symfony project implementing messenger component with apache kafka service.
This repository contains a symfony project, kafka and zookeeper servers.
Services run on docker.
In this example we have configured messenger component to :
- Publish message into kafka topic
- Consume messages from kafka topics
By default, kafka transport is not implemented by messenger that's why we created custom transport in kafka directory.
- git
- docker
- docker-compose
- make
git clone https://github.com/symfony-examples/messenger-kafka.git
This command will create all services and the kafka topic
make install-local
Enjoy ! 🥳
make console app:messenger:producer reference 2000
reference
and 2000
are required arguments, you can replace them by other values
This command will send a message to kafka topic
The topic is defined in config/packages/messenger.yaml
producer_topic
framework:
messenger:
transports:
order_transport:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
...
producer_topic: 'order_topic_test'
make console messenger:consume order_transport
order_transport
is the transport name defined in config/packages/messenger.yaml
This command will consume messages from the kafka topic define in config/packages/messenger.yaml
consumer_topics
framework:
messenger:
transports:
order_transport:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
...
consumer_topics:
- 'order_topic_test'
Messages will be handled by App\Messenger\Handler\OrderPaidHandler
In this example we send an InvoiceCreatedMessage for each order paid, you can update this implementation and put your custom logic here.
// App\Messenger\Handler\OrderPaidHandler
public function __invoke(OrderPaidMessage $message): void
{
// implement logic here
}
rdkafka
extension must be installed.
; kafka.ini
extension=rdkafka.so
## SETUP RDKAFKA EXTESIONS
RUN set -xe \
&& apk add --no-cache --update --virtual .phpize-deps $PHPIZE_DEPS \
librdkafka-dev \
&& pecl install rdkafka
COPY ./.docker/php/kafka.ini $PHP_INI_DIR/conf.d/
Check if the extension is well installed
php --ri rdkafka
Add environment variables in .env file:
# transport dsn must start with kafka://
MESSENGER_TRANSPORT_DSN=kafka://
# kafka broker list separate with comma (exp: kafka-1:9092,kafka-2:9092)
KAFKA_BROKERS=kafka:9092
Configure your transport
framework:
messenger:
transports:
order_transport:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
metadata.broker.list: '%env(KAFKA_BROKERS)%'
group.id: 'my-group-id'
auto.offset.reset: 'earliest'
# you can add here any rdkafka option you need
# https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
...
consumer_topics:
- 'order_topic_test'
Setup transport
make console messenger:setup-transport
Enjoy ! 🥳