This project contains a Docker-Compose config example for development needs: it sets up Zookeeper, Kafka, Schema-Registry and Rest-Proxy using Confluent v5.5.2 images.
Run docker-compose up zookeeper kafka and you are able to externally connect to Kafka at host.docker.internal:29092
and to Schema-Registry at host.docker.internal:8090 from within your Docker apps.
- List topics
docker exec -t kafka_kafka_1 kafka-topics --bootstrap-server kafka:9092 --list - Create a topic named
t1docker exec -t kafka_kafka_1 kafka-topics --bootstrap-server kafka:9092 --create --topic t1 --partitions 1 --replication-factor 1 - Describe a topic by name
docker exec -t kafka_kafka_1 kafka-topics --bootstrap-server kafka:9092 --describe --topic t1 - Attach console producer to a topic by name
docker exec -it kafka_kafka_1 kafka-console-producer --broker-list kafka:9092 --topic t1 - Attach console consumer from a group
g1to a topic by namedocker exec -t kafka_kafka_1 kafka-console-consumer --bootstrap-server kafka:9092 --group g1 --topic t1
A consumer cannot control what the next message from a producer is going to be. Also, you cannot set an offset simply for a group,
you must provide a group + topic + partition combination.
- Describe offsets for group
g1docker exec -t kafka_kafka_1 kafka-consumer-groups --bootstrap-server kafka:9092 --group g1 --describe - Reset offset of topic
t1partition0to earliest for groupg1(you can set offset for many partitions witht1:0,1,2)docker exec -t kafka_kafka_1 kafka-consumer-groups --bootstrap-server kafka:9092 --reset-offsets --topic t1:0 --to-earliest --group g1 --execute - Reset offset of topic
t1partition0to1for groupg1docker exec -t kafka_kafka_1 kafka-consumer-groups --bootstrap-server kafka:9092 --reset-offsets --topic t1:0 --to-offset 1 --group g1 --execute
Run docker-compose up to get all 4 services up (or docker-compose up schema-registry rest-proxy after the basic setup).
Now you can use REST API (which is at localhost:8091) to interact with Kafka
(API reference)
- List topics
curl -L -X GET 'localhost:8091/topics' - Describe a topic by name
curl -L -X GET 'localhost:8091/topics/t1' - Produce some messages to
t1. Send messagesconfluent,kafkaandlogsas Base64-encoded strings:curl -L -X POST 'localhost:8091/topics/t1' \ -H 'Content-Type: application/vnd.kafka.binary.v2+json' \ --data-raw '{"records":[{"value":"Y29uZmx1ZW50"},{"value":"a2Fma2E="},{"value":"bG9ncw=="}]}'
[
{"id": 2, "name": "testuser2"},
{"id": 10, "name": "testuser10"},
{"id": 42, "name": "deusexmachina"},
{"id": 133, "name": "me"}
]Using JSON schemas
Schema
{"type": "object", "properties": {"id": {"type": "integer"}, "name": {"type": "string"}}}- Produce some messages with JSON schema to
tjson(notice that you will receive something like"value_schema_id": 1in the response message):curl -L -X POST 'localhost:8091/topics/tjson' \ -H 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \ --data-raw '{"value_schema":"{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"}}}","records":[{"value":{"id":10,"name":"testuser10"}},{"value":{"id":42,"name":"deusexmachina"}}]}'
- Produce more messages using the schema ID (
1):curl -L -X POST 'localhost:8091/topics/tjson' \ -H 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \ --data-raw '{"value_schema_id":1,"records":[{"value":{"id":2,"name":"testuser2"}},{"value":{"id":133,"name":"me"}}]}'
Using Avro schemas
Schema
{"type": "record", "name": "user", "fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}]}- Produce some messages with Avro schema to
tavro(notice that you will receive something like"value_schema_id": 2in the response message):curl -L -X POST 'localhost:8091/topics/tavro' \ -H 'Content-Type: application/vnd.kafka.avro.v2+json' \ --data-raw '{"value_schema":"{\"type\":\"record\",\"name\":\"user\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}","records":[{"value":{"id":10,"name":"testuser10"}},{"value":{"id":42,"name":"deusexmachina"}}]}'
- Produce more messages using the schema ID (
2):curl -L -X POST 'localhost:8091/topics/tavro' \ -H 'Content-Type: application/vnd.kafka.avro.v2+json' \ --data-raw '{"value_schema_id":2,"records":[{"value":{"id":2,"name":"testuser2"}},{"value":{"id":133,"name":"me"}}]}'