Skip to content

Commit

Permalink
Merge branch 'secure'
Browse files Browse the repository at this point in the history
* secure:
  secured the kafka cluster
  • Loading branch information
markush81 committed Jun 2, 2018
2 parents a94f5c2 + 0b415ee commit c9d679d
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 22 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The result if everything wents fine should be
| Name |  |
|:-- |:-- |
|Zookeeper|kafka-1:2181,kafka-2:2181,kafka-3:2181|
|Kafka Brokers|kafka-1:9092,kafka-2:9092,kafka-3:9092|
|Kafka Brokers|kafka-1:9093,kafka-2:9093,kafka-3:9093|
|Kibana|[http://mon-1:5601](http://mon-1:5601)|
|Elasticsearch|[http://mon-1:9200](http://mon-1:9200)|
|Grafana|[http://mon-2:3000](http://mon-2:3000)|
Expand Down Expand Up @@ -104,7 +104,7 @@ WatchedEvent state:SyncConnected type:None path:null
lucky:~ markus$ vagrant ssh kafka-1
[vagrant@kafka-1 ~]$ kafka-topics.sh --create --zookeeper kafka-1:2181 --replication-factor 2 --partitions 6 --topic sample
Created topic "sample".
[vagrant@kafka-1 ~]$ kafka-topics.sh --zookeeper kafka-1 --topic sample --describe
[vagrant@kafka-1 ~]$ kafka-topics.sh --zookeeper kafka-1:2181 --topic sample --describe
Topic:sample PartitionCount:6 ReplicationFactor:2 Configs:
Topic: sample Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sample Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Expand All @@ -117,21 +117,21 @@ Topic:sample PartitionCount:6 ReplicationFactor:2 Configs:
### Producer

```bash
[vagrant@kafka-1 ~]$ kafka-console-producer.sh --broker-list kafka-1:9092,kafka-3:9092 --topic sample
[2017-04-22 15:27:41,035] WARN Removing server kafka-1::9092 from bootstrap.servers as DNS resolution failed for kafka-1: (org.apache.kafka.clients.ClientUtils)
[vagrant@kafka-1 ~]$ kafka-console-producer.sh --broker-list kafka-1:9093,kafka-3:9093 --producer.config /vagrant/exchange/ssl-client/client-ssl.properties --topic sample
[2017-04-22 15:27:41,035] WARN Removing server kafka-1::9093 from bootstrap.servers as DNS resolution failed for kafka-1: (org.apache.kafka.clients.ClientUtils)
Hey, is Kafka up and running?
```

### Consumer

```bash
[vagrant@kafka-1 ~]$ kafka-console-consumer.sh --bootstrap-server kafka-1:9092,kafka-3:9092 --topic sample --from-beginning
[vagrant@kafka-1 ~]$ kafka-console-consumer.sh --bootstrap-server kafka-1:9093,kafka-3:9093 --consumer.config /vagrant/exchange/ssl-client/client-ssl.properties --topic sample --from-beginning
Hey, is Kafka up and running?
```

### Producer Perf Test

```bash
[vagrant@kafka-1 ~]$ kafka-producer-perf-test.sh --producer-props bootstrap.servers="kafka-1:9092,kafka-2:9092,kafka-3:9092" --topic sample --num-records 2000 --throughput 100 --record-size 256
[vagrant@kafka-1 ~]$ kafka-producer-perf-test.sh --producer.config /vagrant/exchange/ssl-client/client-ssl.properties --producer-props bootstrap.servers="kafka-1:9093,kafka-2:9093,kafka-3:9093" --topic sample --num-records 2000 --throughput 100 --record-size 256

```
5 changes: 4 additions & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Vagrant.configure("2") do |config|
config.vm.synced_folder "exchange", "/home/vagrant/exchange", create: true
config.vm.synced_folder "ansible", "/home/vagrant/ansible", create: true

config.trigger.after :destroy do |trigger|
trigger.run = { inline: 'rm -rf exchange/ssl && rm -rf exchange/ssl-client'}
end

config.vm.provision :shell, inline: "ifup eth1", run: "always"

config.vm.define "mon-1" do |mon|
Expand Down Expand Up @@ -103,5 +107,4 @@ Vagrant.configure("2") do |config|
end
end
end

end
1 change: 1 addition & 0 deletions ansible/cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
remote_user: vagrant
serial: 33%
roles:
- secure
- zookeeper

- hosts: kafka
Expand Down
7 changes: 7 additions & 0 deletions ansible/inventories/vbox/group_vars/all
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
download: /vagrant/download
exchange: /vagrant/exchange
ssl_ca_dir: "{{ exchange }}/ssl"
ssl_client_dir: "{{ exchange }}/ssl-client"
usr_local: /usr/local
etc_profiles: /etc/profile.d
system_units: /etc/systemd/system
jmxtrans_dest: /usr/local/jmxtrans
prometheus_jmx_dest: /usr/local/prometheus_jmx
keystore_pwd: keystore-secret
key_pwd: key-secret
truststore_pwd: truststore-secret
ca_pwd: ca-secret
8 changes: 4 additions & 4 deletions ansible/roles/grafana/dashboards/kafka_graphite.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
{
"$$hashKey": "object:511",
"refId": "A",
"target": "sumSeries(servers.*.kafka.ActiveControllerCount)"
"target": "sumSeries(servers.*.kafka.KafkaController.ActiveControllerCount.Value)"
}
],
"thresholds": "0.01,1",
Expand Down Expand Up @@ -178,7 +178,7 @@
{
"$$hashKey": "object:973",
"refId": "A",
"target": "sumSeries(servers.*.kafka.OfflinePartitionsCount)"
"target": "sumSeries(servers.*.kafka.KafkaController.OfflinePartitionsCount.Value)"
}
],
"thresholds": "0.01,0.01",
Expand Down Expand Up @@ -257,7 +257,7 @@
{
"$$hashKey": "object:1055",
"refId": "A",
"target": "sumSeries(servers.*.kafka.UnderReplicatedPartitions, *)"
"target": "sumSeries(servers.*.kafka.ReplicaManager.UnderReplicatedPartitions.Value, *)"
}
],
"thresholds": "0.01,0.01",
Expand Down Expand Up @@ -336,7 +336,7 @@
{
"$$hashKey": "object:1137",
"refId": "A",
"target": "sumSeries(servers.*.kafka.UnderMinIsrPartitionCount)"
"target": "sumSeries(servers.*.kafka.ReplicaManager.UnderMinIsrPartitionCount.Value)"
}
],
"thresholds": "0.01,0.01",
Expand Down
14 changes: 10 additions & 4 deletions ansible/roles/kafka/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@
daemon_reload: yes
when: kafka_config.changed or kafka_config_jmx.changed or kafka_config_prometheus.changed or kafka_service.changed or kafka_env.changed

- name: create sample topic
- name: wait until kafka has started
wait_for:
host: "{{ inventory_hostname }}"
port: 9093
delay: 10

- name: create {{ exchange }}/client-ssl.properties
become: yes
command: "{{ kafka_home }}/bin/kafka-topics.sh --create --zookeeper {{ groups['zookeeper'][0] }}:2181 --replication-factor 2 --partitions 6 --topic sample"
template:
src: templates/client-ssl.properties.j2
dest: "{{ ssl_client_dir }}/client-ssl.properties"
run_once: true
ignore_errors: true
when: kafka_install.changed
7 changes: 7 additions & 0 deletions ansible/roles/kafka/templates/client-ssl.properties.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
security.protocol=SSL
ssl.truststore.location={{ ssl_client_dir }}/client.truststore.jks
ssl.truststore.password={{ truststore_pwd }}
ssl.keystore.location={{ ssl_client_dir }}/client.keystore.jks
ssl.keystore.password={{ keystore_pwd }}
ssl.key.password={{ key_pwd }}
ssl.enabled.protocols=TLSv1.2
24 changes: 19 additions & 5 deletions ansible/roles/kafka/templates/jmxtrans-agent.xml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,30 @@
<query objectName="java.lang:type=GarbageCollector,name=*" attributes="CollectionTime,CollectionCount" resultAlias="jvm.%name%.#attribute#"/>
<query objectName="java.lang:type=ClassLoading" attribute="LoadedClassCount" resultAlias="jvm.#attribute#"/>
<query objectName="java.lang:type=Threading" attribute="ThreadCount" resultAlias="jvm.#attribute#"/>

<!-- Kafka -->
<query objectName="kafka.controller:type=KafkaController,name=*" attribute="Value" resultAlias="%name%"/>
<query objectName="kafka.server:type=ReplicaManager,name=*" attribute="Value" resultAlias="%name%"/>
<query objectName="kafka.controller:type=KafkaController,name=*" attribute="Value" resultAlias="KafkaController.%name%.#attribute#"/>

<query objectName="kafka.server:type=ReplicaManager,name=FailedIsrUpdatesPerSec" attribute="Count" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=IsrExpandsPerSec" attribute="Count" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=IsrShrinksPerSec" attribute="Count" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=PartitionCount" attribute="Value" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount" attribute="Value" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions" attribute="Value" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=LeaderCount" attribute="Value" resultAlias="ReplicaManager.%name%.#attribute#"/>
<query objectName="kafka.server:type=ReplicaManager,name=OfflineReplicaCount" attribute="Value" resultAlias="ReplicaManager.%name%.#attribute#"/>

<query objectName="kafka.server:type=BrokerTopicMetrics,name=*,topic=*" attributes="OneMinuteRate,Count,MeanRate" resultAlias="BrokerTopicMetrics.%name%.%topic%.#attribute#"/>

<query objectName="kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=*,topic=*,partition=*" attribute="Value" resultAlias="FetcherLagMetrics.%name%.%clientId%.%topic%.%partition%"/>

<query objectName="kafka.network:type=RequestMetrics,name=*,request=Produce" attribute="MeanRate" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=*,request=FetchConsumer" attribute="MeanRate" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=*,request=FetchFollower" attribute="MeanRate" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce" attributes="Count,Max,99thPercentile" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer" attributes="Count,Max,99thPercentile" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower" attributes="Count,Max,99thPercentile" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>

<query objectName="kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" attributes="Count,MeanRate" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer" attributes="Count,MeanRate" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
<query objectName="kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower" attributes="Count,MeanRate" resultAlias="RequestMetrics.%name%.%request%.#attribute#"/>
</queries>
<outputWriter class="org.jmxtrans.agent.GraphitePlainTextTcpOutputWriter">
<host>{{ groups['grafana'][0] }}</host>
Expand Down
18 changes: 16 additions & 2 deletions ansible/roles/kafka/templates/server.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ auto.create.topics.enable=false
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://0.0.0.0:9092
listeners=SSL://0.0.0.0:9093

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://{{ inventory_hostname }}:9092
advertised.listeners=SSL://{{ inventory_hostname }}:9093

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
Expand Down Expand Up @@ -112,6 +112,20 @@ log.segment.bytes=1073741824
# to the retention policies
log.retention.check.interval.ms=300000

############################# Authentication and SSL ###########################

ssl.truststore.location=/usr/local/server.truststore.jks
ssl.truststore.password={{ truststore_pwd }}
ssl.keystore.location=/usr/local/server.keystore.jks
ssl.keystore.password={{ keystore_pwd }}
ssl.key.password={{ key_pwd }}

security.inter.broker.protocol=SSL

ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/secure/defaults/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
validity: 365
76 changes: 76 additions & 0 deletions ansible/roles/secure/tasks/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
- name: generate client folders
file:
path: "{{ item }}"
state: directory
mode: 0755
with_items:
- "{{ ssl_client_dir }}"
- "{{ ssl_ca_dir }}"

- name: generate keystores
become: true
shell: "keytool -keystore {{ item.keystore }} -alias localhost -validity {{ validity }} -genkey -storepass {{ keystore_pwd }} -keypass {{ key_pwd }} -keyalg RSA -dname \"CN=kafka,OU=org,O=org,L=home,ST=Bavaria,C=DE\" -ext SAN=DNS:{{ inventory_hostname }}"
args:
creates: "{{ item.keystore }}"
with_items:
- { keystore: /usr/local/server.keystore.jks }
- { keystore: "{{ ssl_client_dir }}/client.keystore.jks" }

- name: create CA
become: true
shell: "openssl req -new -x509 -keyout {{ ssl_ca_dir }}/ca-key -out {{ ssl_ca_dir }}/ca-cert -passout pass:{{ ca_pwd }} -days {{ validity }} -subj \"/C=DE/ST=Bavaria/L=home/O=org/OU=org/CN=kafka\""
args:
creates: "{{ ssl_ca_dir }}/ca-cert"
#creates: "{{ ssl_ca_dir }}/ca-key"
run_once: true

- name: import CA into truststores
become: true
java_cert:
cert_path: "{{ ssl_ca_dir }}/ca-cert"
cert_alias: CARoot
keystore_path: "{{ item }}"
keystore_pass: "{{ truststore_pwd }}"
keystore_create: true
state: present
with_items:
- "/usr/local/server.truststore.jks"
- "{{ ssl_client_dir }}/client.truststore.jks"

- name: create CSR
become: true
shell: "keytool -keystore {{ item.keystore }} -alias localhost -certreq -file {{ item.cert_csr }} -storepass {{ keystore_pwd }} -keypass {{ key_pwd }} -ext SAN=DNS:{{ inventory_hostname }}"
args:
creates: "{{ item.cert_csr }}"
with_items:
- { keystore: "/usr/local/server.keystore.jks", cert_csr: "/tmp/cert-csr" }
- { keystore: "{{ ssl_client_dir }}/client.keystore.jks", cert_csr: "{{ ssl_client_dir }}/cert-csr" }

- name: create SAN conf for signing
lineinfile:
create: yes
path: /tmp/san.conf
line: 'subjectAltName=DNS:{{ inventory_hostname }}'
state: present

- name: sign certificates
become: true
shell: "openssl x509 -req -CAkey {{ ssl_ca_dir }}/ca-key -CA {{ ssl_ca_dir }}/ca-cert -days {{ validity }} -CAcreateserial -in {{ item.cert_csr }} -passin pass:{{ ca_pwd }} -out {{ item.cert_file_signed }} -extfile /tmp/san.conf"
args:
creates: "{{ item.cert_file_signed }}"
with_items:
- { cert_csr: /tmp/cert-csr, cert_file_signed: /tmp/cert-signed}
- { cert_csr: "{{ ssl_client_dir }}/cert-csr" , cert_file_signed: "{{ ssl_client_dir }}/cert-signed" }
register: signed

- name: import signed certificates into keystores
become: true
shell: "keytool -keystore {{ item.keystore }} -alias {{ item.alias }} -import -file {{ item.path }} -noprompt -storepass {{ keystore_pwd }} -keypass {{ key_pwd }}"
with_items:
- { alias: CARoot, path: "{{ ssl_ca_dir }}/ca-cert", keystore: "/usr/local/server.keystore.jks" }
- { alias: localhost, path: "/tmp/cert-signed", keystore: "/usr/local/server.keystore.jks" }
- { alias: CARoot, path: "{{ ssl_ca_dir }}/ca-cert", keystore: "{{ ssl_client_dir }}/client.keystore.jks" }
- { alias: localhost, path: "{{ ssl_client_dir }}/cert-signed", keystore: "{{ ssl_client_dir }}/client.keystore.jks" }
when: signed.changed
ignore_errors: true #client store already done with first host

0 comments on commit c9d679d

Please sign in to comment.