Skip to content

Commit 0896f42

Browse files
authored
Merge pull request #538 from zendesk/dasch/allow-positional-seed-brokers
Allow passing in the seed brokers in a positional argument
2 parents fa5c5af + d7d3d89 commit 0896f42

11 files changed

+36
-43
lines changed

README.md

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,10 @@ A client must be initialized with at least one Kafka broker, from which the enti
115115
```ruby
116116
require "kafka"
117117

118-
kafka = Kafka.new(
119-
# At least one of these nodes must be available:
120-
seed_brokers: ["kafka1:9092", "kafka2:9092"],
121-
122-
# Set an optional client id in order to identify the client to Kafka:
123-
client_id: "my-application",
124-
)
118+
# The first argument is a list of "seed brokers" that will be queried for the full
119+
# cluster topology. At least one of these *must* be available. `client_id` is
120+
# used to identify this client in logs and metrics. It's optional but recommended.
121+
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")
125122
```
126123

127124
### Producing Messages to Kafka
@@ -430,10 +427,7 @@ require "kafka"
430427

431428
# Configure the Kafka client with the broker hosts and the Rails
432429
# logger.
433-
$kafka = Kafka.new(
434-
seed_brokers: ["kafka1:9092", "kafka2:9092"],
435-
logger: Rails.logger,
436-
)
430+
$kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], logger: Rails.logger)
437431

438432
# Set up an asynchronous producer that delivers its buffered messages
439433
# every ten seconds:
@@ -473,7 +467,7 @@ Consuming messages from a Kafka topic with ruby-kafka is simple:
473467
```ruby
474468
require "kafka"
475469

476-
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
470+
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"])
477471

478472
kafka.each_message(topic: "greetings") do |message|
479473
puts message.offset, message.key, message.value
@@ -496,7 +490,7 @@ Using the API is simple:
496490
```ruby
497491
require "kafka"
498492

499-
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
493+
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"])
500494

501495
# Consumers with the same group id will form a Consumer Group together.
502496
consumer = kafka.consumer(group_id: "my-consumer")
@@ -884,10 +878,7 @@ By enabling SSL encryption you can have some confidence that messages can be sen
884878
In this case you just need to pass a valid CA certificate as a string when configuring your `Kafka` client:
885879

886880
```ruby
887-
kafka = Kafka.new(
888-
ssl_ca_cert: File.read('my_ca_cert.pem'),
889-
# ...
890-
)
881+
kafka = Kafka.new(["kafka1:9092"], ssl_ca_cert: File.read('my_ca_cert.pem'))
891882
```
892883

893884
Without passing the CA certificate to the client it would be impossible to protect against [man-in-the-middle attacks](https://en.wikipedia.org/wiki/Man-in-the-middle_attack).
@@ -898,10 +889,7 @@ If you want to use the CA certs from your system's default certificate store, yo
898889
can use:
899890

900891
```ruby
901-
kafka = Kafka.new(
902-
ssl_ca_certs_from_system: true
903-
# ...
904-
)
892+
kafka = Kafka.new(["kafka1:9092"], ssl_ca_certs_from_system: true)
905893
```
906894

907895
This configures the store to look up CA certificates from the system default certificate store on an as needed basis. The location of the store can usually be determined by:
@@ -913,6 +901,7 @@ In order to authenticate the client to the cluster, you need to pass in a certif
913901

914902
```ruby
915903
kafka = Kafka.new(
904+
["kafka1:9092"],
916905
ssl_ca_cert: File.read('my_ca_cert.pem'),
917906
ssl_client_cert: File.read('my_client_cert.pem'),
918907
ssl_client_cert_key: File.read('my_client_cert_key.pem'),
@@ -935,6 +924,7 @@ In order to authenticate using GSSAPI, set your principal and optionally your ke
935924

936925
```ruby
937926
kafka = Kafka.new(
927+
["kafka1:9092"],
938928
sasl_gssapi_principal: 'kafka/kafka.example.com@EXAMPLE.COM',
939929
sasl_gssapi_keytab: '/etc/keytabs/kafka.keytab',
940930
# ...
@@ -946,6 +936,7 @@ In order to authenticate using PLAIN, you must set your username and password wh
946936

947937
```ruby
948938
kafka = Kafka.new(
939+
["kafka1:9092"],
949940
ssl_ca_cert: File.read('/etc/openssl/cert.pem'), # Optional but highly recommended
950941
sasl_plain_username: 'username',
951942
sasl_plain_password: 'password'
@@ -960,6 +951,7 @@ Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#se
960951

961952
```ruby
962953
kafka = Kafka.new(
954+
["kafka1:9092"],
963955
sasl_scram_username: 'username',
964956
sasl_scram_password: 'password',
965957
sasl_scram_mechanism: 'sha256',

lib/kafka.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,14 @@ class FailedScramAuthentication < SaslScramError
243243
#
244244
# @see Client#initialize
245245
# @return [Client]
246-
def self.new(**options)
247-
Client.new(**options)
246+
def self.new(seed_brokers = nil, **options)
247+
# We allow `seed_brokers` to be passed in either as a positional _or_ as a
248+
# keyword argument.
249+
if seed_brokers.nil?
250+
Client.new(**options)
251+
else
252+
Client.new(seed_brokers: seed_brokers, **options)
253+
end
248254
end
249255
end
250256

lib/kafka/client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ def delete_topic(name, timeout: 30)
481481
# @note This is an alpha level API and is subject to change.
482482
#
483483
# @example Describing the cleanup policy config of a topic
484-
# kafka = Kafka.new(seed_brokers: ["kafka1:9092"])
484+
# kafka = Kafka.new(["kafka1:9092"])
485485
# kafka.describe_topic("my-topic", ["cleanup.policy"])
486486
# #=> { "cleanup.policy" => "delete" }
487487
#

lib/kafka/consumer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module Kafka
2020
#
2121
# require "kafka"
2222
#
23-
# kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
23+
# kafka = Kafka.new(["kafka1:9092", "kafka2:9092"])
2424
#
2525
# # Create a new Consumer instance in the group `my-group`:
2626
# consumer = kafka.consumer(group_id: "my-group")

lib/kafka/producer.rb

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ module Kafka
1414
# do it for you, e.g.
1515
#
1616
# # Will instantiate Kafka::Client
17-
# kafka = Kafka.new(...)
17+
# kafka = Kafka.new(["kafka1:9092", "kafka2:9092"])
1818
#
1919
# # Will instantiate Kafka::Producer
2020
# producer = kafka.producer
@@ -106,12 +106,7 @@ module Kafka
106106
# # cluster to auto-create topics.
107107
# topic = "random-messages"
108108
#
109-
# kafka = Kafka.new(
110-
# seed_brokers: brokers,
111-
# client_id: "simple-producer",
112-
# logger: logger,
113-
# )
114-
#
109+
# kafka = Kafka.new(brokers, client_id: "simple-producer", logger: logger)
115110
# producer = kafka.producer
116111
#
117112
# begin

spec/functional/batch_consumer_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
topic = create_random_topic(num_partitions: 15)
1010

1111
Thread.new do
12-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
12+
kafka = Kafka.new(kafka_brokers, client_id: "test")
1313
producer = kafka.producer
1414

1515
messages.each do |i|
@@ -23,7 +23,7 @@
2323

2424
threads = 2.times.map do |thread_id|
2525
t = Thread.new do
26-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
26+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
2727
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
2828
consumer.subscribe(topic)
2929

spec/functional/client_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
example "listing available topics in the cluster" do
1212
# Use a clean Kafka instance to avoid hitting caches.
13-
kafka = Kafka.new(seed_brokers: KAFKA_BROKERS, logger: LOGGER)
13+
kafka = Kafka.new(KAFKA_BROKERS, logger: LOGGER)
1414

1515
topics = kafka.topics
1616

spec/functional/consumer_group_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
messages = (1..1000).to_a
77

88
begin
9-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
9+
kafka = Kafka.new(kafka_brokers, client_id: "test")
1010
producer = kafka.producer
1111

1212
messages.each do |i|
@@ -22,7 +22,7 @@
2222
received_messages = []
2323

2424
consumers = 2.times.map do
25-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
25+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
2626
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
2727
consumer.subscribe(topic)
2828
consumer
@@ -60,7 +60,7 @@
6060
var = ConditionVariable.new
6161

6262
Thread.new do
63-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
63+
kafka = Kafka.new(kafka_brokers, client_id: "test")
6464
producer = kafka.producer
6565

6666
messages.each do |i|
@@ -88,7 +88,7 @@
8888

8989
threads = 2.times.map do |thread_id|
9090
t = Thread.new do
91-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
91+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
9292
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
9393
consumer.subscribe(topic)
9494

spec/fuzz/consumer_group_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
before do
1111
logger.level = Logger::INFO
1212

13-
kafka = Kafka.new(seed_brokers: kafka_brokers, logger: logger)
13+
kafka = Kafka.new(kafka_brokers, logger: logger)
1414
producer = kafka.producer(max_buffer_size: num_messages)
1515

1616
messages.each do |i|
@@ -66,7 +66,7 @@
6666
def start_consumer(group_id, result_queue)
6767
thread = Thread.new do
6868
kafka = Kafka.new(
69-
seed_brokers: kafka_brokers,
69+
kafka_brokers,
7070
logger: logger,
7171
socket_timeout: 20,
7272
connect_timeout: 20,

spec/fuzz/producer_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
describe "Producing a lot of messages with an unreliable cluster", fuzz: true do
22
let(:logger) { LOGGER }
3-
let(:kafka) { Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger) }
3+
let(:kafka) { Kafka.new(kafka_brokers, client_id: "test", logger: logger) }
44
let(:producer) { kafka.producer(max_retries: 20, retry_backoff: 5) }
55

66
before do

spec/spec_helper.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def self.included(base)
6969
base.class_eval do
7070
let(:logger) { LOGGER }
7171
let(:kafka_brokers) { KAFKA_BROKERS }
72-
let(:kafka) { Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger) }
72+
let(:kafka) { Kafka.new(kafka_brokers, client_id: "test", logger: logger) }
7373

7474
after { kafka.close rescue nil }
7575
end

0 commit comments

Comments
 (0)