Skip to content

Latest commit

 

History

History
137 lines (127 loc) · 4.63 KB

README.md

File metadata and controls

137 lines (127 loc) · 4.63 KB

Symfony & Kafka

SF Messenger Kafka CI SF Messenger Kafka Security SF Messenger Kafka Packages retention policy

Screenshot 2024-03-25 at 15 26 41

About

A sample Symfony project implementing messenger component with apache kafka service.
This repository contains a symfony project, kafka and zookeeper servers.
Services run on docker.
In this example we have configured messenger component to :

  • Publish message into kafka topic
  • Consume messages from kafka topics

By default, kafka transport is not implemented by messenger that's why we created custom transport in kafka directory.

Requirements

  • git
  • docker
  • docker-compose
  • make

How to

Clone the project

git clone https://github.com/symfony-examples/messenger-kafka.git

Installation

This command will create all services and the kafka topic

make install-local

Enjoy ! 🥳

Check if all is done

Producer

make console app:messenger:producer reference 2000

reference and 2000 are required arguments, you can replace them by other values
This command will send a message to kafka topic
The topic is defined in config/packages/messenger.yaml producer_topic

framework:
    messenger:
        transports:
            order_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    ...
                    producer_topic: 'order_topic_test'

Consumer

make console messenger:consume order_transport

order_transport is the transport name defined in config/packages/messenger.yaml
This command will consume messages from the kafka topic define in config/packages/messenger.yaml consumer_topics

framework:
    messenger:
        transports:
            order_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    ...
                    consumer_topics:
                        - 'order_topic_test'

Messages will be handled by App\Messenger\Handler\OrderPaidHandler
In this example we send an InvoiceCreatedMessage for each order paid, you can update this implementation and put your custom logic here.

// App\Messenger\Handler\OrderPaidHandler
public function __invoke(OrderPaidMessage $message): void
{
    // implement logic here
}

Setup in your symfony project

PHP extension

rdkafka extension must be installed.

; kafka.ini
extension=rdkafka.so
## SETUP RDKAFKA EXTESIONS
RUN set -xe \
    && apk add --no-cache --update --virtual .phpize-deps $PHPIZE_DEPS \
    librdkafka-dev \
    && pecl install rdkafka
COPY ./.docker/php/kafka.ini $PHP_INI_DIR/conf.d/

Check if the extension is well installed

php --ri rdkafka

Config env

Add environment variables in .env file:

# transport dsn must start with kafka://
MESSENGER_TRANSPORT_DSN=kafka://
# kafka broker list separate with comma (exp: kafka-1:9092,kafka-2:9092)
KAFKA_BROKERS=kafka:9092

Config messenger

Configure your transport

framework:
    messenger:
        transports:
            order_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    metadata.broker.list: '%env(KAFKA_BROKERS)%'
                    group.id: 'my-group-id'
                    auto.offset.reset: 'earliest'
                    # you can add here any rdkafka option you need
                    # https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
                    ...
                    consumer_topics:
                        - 'order_topic_test'

Setup transport

make console messenger:setup-transport

Enjoy ! 🥳

References