Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch #41

Merged
merged 15 commits into from
Nov 3, 2017
Merged
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# WaterDrop changelog

## 0.4.0.1 - Unreleased
## 1.0.0 unreleased

- #37 - ack level for producer
- Gem bump
- Ruby 2.4.2 support
- Raw ruby-kafka driver is now replaced with delivery_boy
- Sync and async producers
- Complete update of the API
- Much better validations for config details
- Complete API remodel - please read the new README
- Renamed send_messages to deliver

## 0.4
- Bump to match Karafka
Expand Down
67 changes: 47 additions & 20 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,40 +1,67 @@
PATH
remote: .
specs:
waterdrop (0.4.0.1)
bundler
connection_pool
dry-configurable (~> 0.6)
waterdrop (1.0.0.alpha2)
delivery_boy (>= 0.2.2)
dry-configurable (~> 0.7)
dry-validation (~> 0.11)
null-logger
rake
ruby-kafka (~> 0.4)
ruby-kafka (>= 0.5)

GEM
remote: https://rubygems.org/
specs:
concurrent-ruby (1.0.5)
connection_pool (2.2.1)
delivery_boy (0.2.2)
king_konf (~> 0.1.8)
ruby-kafka (~> 0.4)
diff-lcs (1.3)
docile (1.1.5)
dry-configurable (0.7.0)
concurrent-ruby (~> 1.0)
dry-container (0.6.0)
concurrent-ruby (~> 1.0)
dry-configurable (~> 0.1, >= 0.1.3)
dry-core (0.3.4)
concurrent-ruby (~> 1.0)
dry-equalizer (0.2.0)
dry-logic (0.4.2)
dry-container (~> 0.2, >= 0.2.6)
dry-core (~> 0.2)
dry-equalizer (~> 0.2)
dry-types (0.12.1)
concurrent-ruby (~> 1.0)
dry-configurable (~> 0.1)
dry-container (~> 0.3)
dry-core (~> 0.2, >= 0.2.1)
dry-equalizer (~> 0.2)
dry-logic (~> 0.4, >= 0.4.2)
inflecto (~> 0.0.0, >= 0.0.2)
dry-validation (0.11.1)
concurrent-ruby (~> 1.0)
dry-configurable (~> 0.1, >= 0.1.3)
dry-core (~> 0.2, >= 0.2.1)
dry-equalizer (~> 0.2)
dry-logic (~> 0.4, >= 0.4.0)
dry-types (~> 0.12.0)
inflecto (0.0.2)
json (2.1.0)
king_konf (0.1.8)
null-logger (0.1.4)
rake (12.1.0)
rspec (3.6.0)
rspec-core (~> 3.6.0)
rspec-expectations (~> 3.6.0)
rspec-mocks (~> 3.6.0)
rspec-core (3.6.0)
rspec-support (~> 3.6.0)
rspec-expectations (3.6.0)
rspec (3.7.0)
rspec-core (~> 3.7.0)
rspec-expectations (~> 3.7.0)
rspec-mocks (~> 3.7.0)
rspec-core (3.7.0)
rspec-support (~> 3.7.0)
rspec-expectations (3.7.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.6.0)
rspec-mocks (3.6.0)
rspec-support (~> 3.7.0)
rspec-mocks (3.7.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.6.0)
rspec-support (3.6.0)
ruby-kafka (0.4.2)
rspec-support (~> 3.7.0)
rspec-support (3.7.0)
ruby-kafka (0.5.0)
simplecov (0.15.1)
docile (~> 1.1.0)
json (>= 1.8, < 3)
Expand Down
111 changes: 64 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
[![Build Status](https://travis-ci.org/karafka/waterdrop.png)](https://travis-ci.org/karafka/waterdrop)
[![Join the chat at https://gitter.im/karafka/karafka](https://badges.gitter.im/karafka/karafka.svg)](https://gitter.im/karafka/karafka?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

Gem used to send messages to Kafka in an easy way.
Gem used to send messages to Kafka in an easy way with an extra validation layer. It is a part of the [Karafka](https://github.com/karafka/karafka) ecosystem.

WaterDrop is based on Zendesks [delivery_boy](https://github.com/zendesk/delivery_boy) gem.

It is:

- Thread safe
- Supports sync and async producers

## Installation

Expand All @@ -25,69 +32,79 @@ bundle install

## Setup

WaterDrop has following configuration options:

| Option | Required | Value type | Description |
|--------------------------------|------------|-----------------|------------------------------------------------------------------------------------|
| send_messages | true | Boolean | Should we send messages to Kafka |
| connect_timeout | false | Integer | Number of seconds to wait while connecting to a broker for the first time |
| required_acks | false | Symbol, Integer | [:all, 0, 1] acknowledgement level for Kafka |
| socket_timeout | false | Integer | Number of seconds to wait when reading from or writing to a socket |
| connection_pool.size | true | Integer | Kafka connection pool size |
| connection_pool.timeout | true | Integer | Kafka connection pool timeout |
| kafka.seed_brokers | true | Array<String> | Kafka servers hosts with ports |
| raise_on_failure | true | Boolean | Should we raise an exception when we cannot send message to Kafka - if false will silently ignore failures |
| kafka.ssl_ca_cert | false | String | SSL CA certificate |
| kafka.ssl_ca_cert_file_path | false | String | SSL CA certificate file path |
| kafka.ssl_client_cert | false | String | SSL client certificate |
| kafka.ssl_client_cert_key | false | String | SSL client certificate password |
| kafka.sasl_gssapi_principal | false | String | SASL principal |
| kafka.sasl_gssapi_keytab | false | String | SASL keytab |
| kafka.sasl_plain_authzid | false | String | The authorization identity to use |
| kafka.sasl_plain_username | false | String | The username used to authenticate |
| kafka.sasl_plain_password | false | String | The password used to authenticate |
| producer.compression_codec | false | Symbol | Producer compression codec |
| producer.compression_threshold | false | Integer | Producer compression threshold |

To apply this configuration, you need to use a *setup* method:
WaterDrop is a complex tool, that contains multiple configuration options. To keep everything organized, all the configuration options were divided into two groups:

- WaterDrop options - options directly related to Karafka framework and it's components
- Ruby-Kafka driver options - options related to Ruby-Kafka/Delivery boy

To apply all those configuration options, you need to use the ```#setup``` method:

```ruby
WaterDrop.setup do |config|
config.send_messages = true
config.connection_pool.size = 20
config.connection_pool.timeout = 1
config.kafka.seed_brokers = ['localhost:9092']
config.raise_on_failure = true
config.deliver = true
config.kafka.seed_brokers = %w[kafka://localhost:9092]
end
```

This configuration can be placed in *config/initializers* and can vary based on the environment:
### WaterDrop configuration options

| Option | Description |
|-----------------------------|------------------------------------------------------------------|
| client_id | This is how the client will identify itself to the Kafka brokers |
| logger | Logger that we want to use |
| deliver | Should we send messages to Kafka |

### Ruby-Kafka driver and Delivery boy configuration options

**Note:** We've listed here only **the most important** configuration options. If you're interested in all the options, please go to the [config.rb](https://github.com/karafka/waterdrop/blob/master/lib/water_drop/config.rb) file for more details.

**Note:** All the options are subject to validations. In order to check what is and what is not acceptable, please go to the [config.rb validation schema](https://github.com/karafka/waterdrop/blob/master/lib/water_drop/schemas/config.rb) file.

| Option | Description |
|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| delivery_interval | The number of seconds between background message deliveries. Disable timer-based background deliveries by setting this to 0. |
| delivery_threshold | The number of buffered messages that will trigger a background message delivery. Disable buffer size based background deliveries by setting this to 0.|
| required_acks | The number of Kafka replicas that must acknowledge messages before they're considered as successfully written. |
| ack_timeout | A timeout executed by a broker when the client is sending messages to it. |
| max_retries | The number of retries when attempting to deliver messages. |
| retry_backoff | The number of seconds to wait after a failed attempt to send messages to a Kafka broker before retrying. |
| max_buffer_bytesize | The maximum number of bytes allowed in the buffer before new messages are rejected. |
| max_buffer_size | The maximum number of messages allowed in the buffer before new messages are rejected. |
| max_queue_size | The maximum number of messages allowed in the queue before new messages are rejected. |
| sasl_plain_username | The username used to authenticate. |
| sasl_plain_password | The password used to authenticate. |

This configuration can be also placed in *config/initializers* and can vary based on the environment:

```ruby
WaterDrop.setup do |config|
config.send_messages = Rails.env.production?
config.connection_pool.size = 20
config.connection_pool.timeout = 1
config.kafka.seed_brokers = [Rails.env.production? ? 'prod-host:9091' : 'localhost:9092']
config.raise_on_failure = Rails.env.production?
config.deliver = Rails.env.production?
config.kafka.seed_brokers = [Rails.env.production? ? 'kafka://prod-host:9091' : 'kafka://localhost:9092']
end
```

## Usage

### Creating and sending standard messages

To send Kafka messages, just create and send messages directly:
To send Kafka messages, just use one of the producers:

```ruby
message = WaterDrop::Message.new('topic', 'message')
message.send!
WaterDrop::SyncProducer.call('message', topic: 'my-topic')

message = WaterDrop::Message.new('topic', { user_id: 1 }.to_json)
message.send!
# or for async

WaterDrop::AsyncProducer.call('message', topic: 'my-topic')
```

message that you want to send should be either binary or stringified (to_s, to_json, etc).
Both ```SyncProducer``` and ```AsyncProducer``` accept following options:

| Option | Required | Value type | Description |
|-------------------- |----------|----------------|---------------------------------------------------------------------|
| ```topic``` | true | String, Symbol | The Kafka topic that should be written to |
| ```key``` | false | String | The key that should be set on the Kafka message |
| ```partition``` | false | Integer | A specific partition number that should be written to |
| ```partition_key``` | false | String | A string that can be used to deterministically select the partition |

Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).

## References

Expand All @@ -100,7 +117,7 @@ message that you want to send should be either binary or stringified (to_s, to_j
Fork the project.
Make your feature addition or bug fix.
Add tests for it. This is important so we don't break it in a future versions unintentionally.
Commit, do not mess with Rakefile, version, or history. (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull). Send me a pull request. Bonus points for topic branches.
Commit, do not mess with version, or history. (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull). Send me a pull request. Bonus points for topic branches.

[![coditsu](https://coditsu.io/assets/quality_bar.svg)](https://app.coditsu.io/karafka/repositories/waterdrop)

Expand All @@ -111,7 +128,7 @@ Unfortunately, it does not yet support independent forks, however you should be
Please run:

```bash
bundle exec rake
bundle exec rspec
```

to check if everything is in order. After that you can submit a pull request.
8 changes: 0 additions & 8 deletions Rakefile

This file was deleted.

6 changes: 6 additions & 0 deletions config/errors.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
en:
errors:
broker_schema?: >
has an invalid format.
Expected schema, host and port number.
Example: kafka://127.0.0.1:9092 or kafka+ssl://127.0.0.1:9092
57 changes: 35 additions & 22 deletions lib/water_drop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,62 @@

# External components
%w[
rake
rubygems
bundler
logger
pathname
json
kafka
forwardable
connection_pool
delivery_boy
null_logger
dry-configurable
dry-validation
].each { |lib| require lib }

# Internal components
base_path = File.dirname(__FILE__) + '/water_drop'

%w[
version
producer_proxy
pool
config
message
].each { |lib| require "#{base_path}/#{lib}" }

# WaterDrop library
module WaterDrop
class << self
attr_writer :logger

# @return [Logger] logger that we want to use
def logger
@logger ||= NullLogger.new
end
attr_accessor :logger

# Sets up the whole configuration
# @param [Block] block configuration block
def setup(&block)
Config.setup(&block)

DeliveryBoy.logger = self.logger = config.logger

# Recursive lambda for mapping config down to delivery boy
applier = lambda { |db_config, h|
h.each do |k, v|
applier.call(db_config, v) && next if v.is_a?(Hash)
next unless db_config.respond_to?(:"#{k}=")
db_config.public_send(:"#{k}=", v)
end
}

DeliveryBoy.config.tap do |config|
config.brokers = Config.config.kafka.seed_brokers
applier.call(config, Config.config.to_h)
end
end

# @return [WaterDrop::Config] config instance
def config
Config.config
end

# @return [String] root path of this gem
def gem_root
Pathname.new(File.expand_path('../..', __FILE__))
end
end
end

%w[
version
schemas/message_options
schemas/config
config
errors
base_producer
sync_producer
async_producer
].each { |lib| require "#{base_path}/#{lib}" }
8 changes: 8 additions & 0 deletions lib/water_drop/async_producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

# WaterDrop library
module WaterDrop
# Async producer for messages
AsyncProducer = Class.new(BaseProducer)
AsyncProducer.method_name = :deliver_async
end
Loading