Skip to content

oridonner/kafka-connect-twitter-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

39 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Start docker containers

Before starting up the cluster create a storage folder for sqream storage inside local data folder:
docker run --rm -v $(pwd)/data:/mnt sqream:2.15-dev bash -c "./sqream/build/SqreamStorage -C -r /mnt/sqream_storage"

Create Docker local network:
docker network create kafka-cluster
docker-compose up
This command will start sqreamd, zookeeper, 2 kafka brokers (broker-1, broker-2), schema registry, kafka connect.

Tests

First check status of the containers, executing this command:
docker-compose ps
If all containers are up and running start the following funcionality tests.

Test Kafka Broker

Test if Kafka Broker is running by createing a topic:
docker run --net=kafka-cluster --rm confluentinc/cp-kafka:5.0.0 kafka-topics --create --topic foo --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

List all existing topics:
docker run --net=kafka-cluster --rm confluentinc/cp-kafka:5.0.0 kafka-topics --list --zookeeper zookeeper:32181

Delete testing topic:
docker run --net=kafka-cluster --rm confluentinc/cp-kafka:5.0.0 kafka-topics --delete --topic foo --zookeeper zookeeper:32181

Test Schema Registry

Code examples from here:

Register a new version of a schema under the subject "Kafka-key"
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"string\"}"}' http://localhost:8081/subjects/Kafka-key/versions

List all subjects:
curl -X GET http://localhost:8081/subjects | jq

Delete subject:
curl -X DELETE http://localhost:8081/subjects/Kafka-key

Test kafka connect

Check available connector plugins:
curl localhost:8083/connector-plugins | jq
You shoud see Twitter connector plugin among other built in connectors:

{
"class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"type": "source",
"version": "0.2-SNAPSHOT"
},

Test kafka cluster listeners

Mapped internal broker listerners can't be used by schema registry as discussed here, and remains PLAINTEXT. External broker listeners is named EXTERNAL, and mapped to PLAINTEXT security protocol.

Run a kstet container configed to kafka-cluster network:
docker run -it --rm --network=kafka-cluster kstet:1.0.0 bash

Test listeners from inside of the container:
Test internal listener:
kafkacat -b broker-1:29092 -L
kafkacat -b broker-2:29092 -L

Test external listener:
kafkacat -b broker-1:9092 -L
kafkacat -b broker-2:9093 -L

Test external listener from host, where 172.17.0.1 is docker0 ip:
kafkacat -b 172.17.0.1:9093 -L

Build Connectors

Build Twitter Source Connector

Before starting the connector check existing topics:
docker run --net=kafka-cluster --rm confluentinc/cp-kafka:5.0.0 kafka-topics --zookeeper zookeeper:32181 --list

If Twitter topic exists, delete it:
docker run --net=kafka-cluster --rm confluentinc/cp-kafka:5.0.0 kafka-topics --zookeeper zookeeper:32181 --delete --topic twitter

Create Twitter Source Connector via REST API call to kafka connect listens on 8083 port:
echo '{"name":"source-twitter","config":{"connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector","tasks.max":"1","twitter.oauth.consumerKey":"WyBAPqB21h196UyENZATSnL3F","twitter.oauth.consumerSecret":"jQtSGi0tynigU51EkSfCNahqrBkHE18cH0xU2FttVzTKpNbcJO","twitter.oauth.accessToken":"908270057641463809-iju88vZOOTl2hiROMRA1XGLG1CnQPKI","twitter.oauth.accessTokenSecret":"r4L9oXix5wqoAD5GNIMtjRJOHuVO65mWLynhmmnD7sOW1","filter.keywords":"programming,java,kafka,scala","kafka.status.topic":"twitter","kafka.delete.topic":"twitter_del","process.deletes":"true"}}'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

Check id connector was created on server:
curl localhost:8083/connectors

Check if Twitter topic was created:
docker run --net=kafka-cluster --rm confluentinc/cp-kafka:5.0.0 kafka-topics --zookeeper zookeeper:32181 --list | grep twitter

Check connector status:
curl localhost:8083/connectors/source-twitter/status | jq

Read data from topic with Avro consumer:
Pay attention to the --property flag added at the end, it is required when running kafka-avro-console-consumer over local docker network:
docker run --net=kafka-cluster --rm confluentinc/cp-schema-registry:5.0.0 kafka-avro-console-consumer --bootstrap-server broker-1:29092 --topic twitter --from-beginning --property schema.registry.url="http://schema-registry:8081"

Check topic schema:
curl -X GET http://localhost:8081/schemas/ids/2

Pause connector:
curl -X PUT localhost:8083/connectors/source-twitter/pause

To restart connector:
curl -X PUT localhost:8083/connectors/source-twitter/resume

Delete connector:
curl -X DELETE localhost:8083/connectors/source-twitter

Create SQream Sink Connector

echo '{"name":"sqream-sink","config":{"connector.class":"JdbcSinkConnector","connection.url":"jdbc:Sqream://192.168.0.212:5000/master","connection.user":"sqream","connection.password":"sqream","tasks.max":"1","topics":"twitter","insert.mode":"insert","table.name.format":"twitter","fields.whitelist":"Id,CreatedAt,Text,Source,Truncated,InReplyToStatusId,InReplyToUserId,Favorited,Retweeted,FavoriteCount"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

Check if connector was created:
curl localhost:8083/connectors

Log inside SQream:
docker run -it twitter-sqream-pipeline_sqreamd_1 bash -c "./sqream/build/ClientCmd --user=sqream --password=sqream -d master"

Pause connector:
curl -X PUT localhost:8083/connectors/sqream-sink/pause

To restart connector:
curl -X PUT localhost:8083/connectors/sqream-sink/resume

Delete connector:
curl -X DELETE localhost:8083/connectors/sqream-sink

About

Different Twitter Kafka Connect configurations

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published