Write Kafka consumers in a model with retry
Add this line to your application's Gemfile:
gem 'simple_kafka_consumer'And then execute:
$ bundle
Or install it yourself as:
$ gem install simple_kafka_consumer
You will want to write your own consumer class that inherits from SimpleKafkaConsumer::Consumer. You will want to specify the group_name and topic_name. You'll also want to define the consume method which is the handler for batch of messages received.
class MyConsumer < SimpleKafkaConsumer::Consumer
# the name used for coordinating multiple consumers
self.group_name = "my-group-name"
# the kafka topic we're reading from
self.topic_name = "my-topic-name"
# handle the messages
def consume(message)
puts message
end
endYou can have the consumer handle deserializing your data that is sent in the message. For example, if you used json as your message format:
class MyConsumer
def parse(message)
JSON.parse(message)
end
# the message you're consuming is now a parsed json object
def consume(json_object)
puts json_object['name']
end
endTo create a consumer instance, you'll need to provide an array of kafka servers and an array of zookeeper servers. You can optionally provide a logger as well.
# create a consumer
kafka_servers = ["localhost:9092"]
zookeeper_servers = ["localhost:2181"]
consumer = MyConsumer.new(kafka_servers, zookeeper_servers, logger: nil)
# run the consumer (loops and blocks)
consumer.runThis gem utilizes the poseidon_cluster gem and consumers coordinate via zookeeper. Thus, you can run many consumers. The group_name is what's used to determine which messages have already been processed.
- Fork it ( https://github.com/chingor13/simple_kafka_consumer/fork )
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request