This workshop attempts to illustrate how to use Confluent Cloud Platform on GCP
- Google Cloud Platform
- Confluent Cloud Professional
- Confluent Platform (for local testing)
- Confluent Schema Registry
- Kafka Connect
- Confluent KSQL
- Confluent Ansible scripts
- Kafkacat
TODO
- Initialize a cluster in Confluent Cloud, and get API key/secret
- Create some GCP hosts, we're using 2 n1-standard-4 (4 vCPUs, 15 GB memory)
- Clone the Confluent Ansible repository
$ git clone https://github.com/cjmatta/cp-ansible $ git checkout ccloud-profiles
- Edit the
hosts.gcp-workshop.yml
andgcp-workshop.yml
with your hosts, API info, and desired roles to install on which hosts, and copy them into thecp-ansible
directory. - Install Confluent Platform components on the GCP hosts
$ ansible-playbook --private-key=~/.ssh/google_compute_engine -i hosts.gcp-workshop.yml gcp-workshop.yml
- Install Connect plugins
$ ansible-playbook --private-key=~/.ssh/google_compute_engine -i hosts.gcp-workshop.yml install-connectors-playbook.yml
-
Create
wikipedia
topic:$ ccloud topic create wikipedia --replication-factor 3 --partitions 3
-
Edit
submit_wikipedia_irc_config.sh
with schema registry IPs, and then submit the connector:$ ./submit_wikipedia_irc_config.sh <connect-distributed-host>
-
Check the status of the connector:
$ curl http://35.231.187.35:8083/connectors/wikipedia-irc/status | jq . % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 169 100 169 0 0 1631 0 --:--:-- --:--:-- --:--:-- 1640 { "name": "wikipedia-irc", "connector": { "state": "RUNNING", "worker_id": "10.142.0.5:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "10.142.0.6:8083" } ], "type": "source" }
-
Use the
kafka-avro-console-consumer
to test that data is flowing into the topic:$ ${CONFLUENT_HOME}/bin/kafka-avro-console-consumer --bootstrap-server pkc-l9v0e.us-central1.gcp.confluent.cloud:9092 --consumer.config ~/.ccloud/config --topic wikipedia --property schema.registry.url=http://<schema-registry-ip>:8081 {"createdat":1536371033912,"wikipage":"Jo Hyun-jae","channel":"#en.wikipedia","username":"2.205.55.98","commitmessage":"","bytechange":36,"diffurl":"https://en.wikipedia.org/w/index.php?diff=858558341&oldid=855644132","isnew":false,"isminor":false,"isbot":false,"isunpatrolled":false} --- snip ---
- Log into one of the KSQL servers
- Start KSQL CLI
$ sudo ksql http://localhost:8088
- Register the wikipedia topic as a stream:
ksql> create stream wikipediasource with (kafka_topic='wikipedia', value_format='avro'); Message ---------------- Stream created ----------------
This will demonstrate joining a stream of events to a table of dimensions for data enrichment. The stream of Wikipedia edits will be joined to a compacted topic consisting of channel -> language.
-
Create compacted topic to capture the lookup data:
$ ccloud topic create wikipedia-language-map --partitions 3 --replication-factor 3 --config cleanup.policy=compact
-
Push lookup data into the topic using the
publish_language_map.sh
(requires Kafkacat):$ ./publish_language_map.sh
-
Consume topic to ensure the data is there:
$ kafkacat -F ~/.ccloud/config -b <broker endpoint>:9092 -C -t wikipedia-language-map -o beginning -K:
-
From KSQL CLI, register the topic as a TABLE:
$ create table wikipedialanguages (channel varchar, language varchar) with (kafka_topic='wikipedia-language-map', value_format='delimited', key='channel'); Message --------------- Table created ---------------
-
Select from the table:
ksql> SET 'auto.offset.reset' = 'earliest'; Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest' ksql> select * from wikipedialanguages; 1536429549813 | #en.wikipedia | #en.wikipedia | English 1536429549813 | #fr.wikipedia | #fr.wikipedia | French 1536429549813 | #es.wikipedia | #es.wikipedia | Spanish 1536429549813 | #en.wiktionary | #en.wiktionary | English 1536429549813 | #de.wikipedia | #de.wikipedia | German 1536429549813 | #eo.wikipedia | #eo.wikipedia | Esperanto 1536429549813 | #ru.wikipedia | #ru.wikipedia | Russian 1536429549813 | #it.wikipedia | #it.wikipedia | Italian 1536429549813 | #vo.wikipedia | #vo.wikipedia | Volapük 1536429549813 | #zh.wikipedia | #zh.wikipedia | Chinese 1536429549813 | #sd.wikipedia | #sd.wikipedia | Arabic 1536429549813 | #mediawiki.wikipedia | #mediawiki.wikipedia | English 1536429549813 | #commons.wikimedia | #commons.wikimedia | English 1536429549813 | #eu.wikipedia | #eu.wikipedia | English 1536429549813 | #uk.wikipedia | #uk.wikipedia | English
-
Create an enriched stream by joining the edits to the language topic:
create stream wikipediaedits with ( \ value_format = 'avro' \ ) as \ select \ w.createdat, \ w.wikipage, \ w.channel channel, \ w.username, \ w.commitmessage, \ w.bytechange, \ w.diffurl, \ w.isnew, \ w.isminor, \ w.isbot, \ w.isunpatrolled, \ l.language \ from \ wikipediasource w \ INNER JOIN wikipedialanguages l on w.channel = l.channel;
- Create service account, and download the authentication json file
- Copy keyfile to the connect hosts:
$ ansible -i hosts.gcp-workshop.yml --private-key=~/.ssh/google_compute_engine -m copy -a "src=<path to keyfile> dest=/etc/kafka/gbq-keyfile.json" connect-distributed
- Edit
submit_google_big_query_config.sh
with Schema Registry ips, and then submit the config:$ ./submit_google_big_query_config.sh <connect host ip>