Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

Latest commit

 

History

History
259 lines (202 loc) · 13 KB

File metadata and controls

259 lines (202 loc) · 13 KB

Example Implementation of HTTP-based Interactive Query Service

This example demonstrates the following features in Kafka Streams along with an HTTP based interactive query service:

  1. Data ingestion
  2. Data transformation using a Kafka Streams DSL-based implementation
  3. Managing local state with key-value stores
  4. Interactive query service with HTTP end points

The implementation is based on the ClarkNet dataset, which has to be downloaded in a local folder.

Build and Run Locally

By default the application runs through an embedded local Kafka Server. In case you want to run separate instances of Kafka and Zookeeper servers, change kafka.localserver to false in application.conf.

To run the application, do the following steps.

Build the Libraries

This example application depends on kafka-streams-scala and kafka-streams-query. Ensure that you have the proper versions of these libraries in your classpath. Note that in this example Scala 2.12.4 and Kafka 1.0.0 are used.

If you've made local changes kafka-streams-query then you'll need to publish them to your local ivy repository using sbt publishLocal from within the ./lib/ directory.

Start ZooKeeper and Kafka

This is only required if the setting of kafka.localserver is false in application.conf. If this is set to true, the application runs with an embedded local Kafka server. However, note that if you want to run the application in a distributed mode(see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server.

Start ZooKeeper and Kafka, if not already running. You can download Kafka 1.0.0 for Scala 2.12 here, then follow the Quick Start instructions for running ZooKeeper and Kafka, steps 1 and 2.

Download the ClarkNet dataset

Download the ClarkNet dataset and put it in a convenient local folder.

Configure the Application Properties

Copy src/main/resources/application-dsl.conf.template to src/main/resources/application-dsl.conf.

Edit src/main/resources/application-dsl.conf and set the entry for directorytowatch to match the folder name where you installed the ClarkNet dataset.

And note that you can run the application with a bundled local Kafka server by setting kafka.localserver to true in the application.conf file.

Create the Kafka Topics

This is only required if the setting of kafka.localserver is false in application.conf. If this is set to true, the application runs with an embedded local Kafka server and creates all necessary topics on its own. However, note that if you want to run the application in a distributed mode (see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server. If you're running in distributed mode then topics should have more than 1 partition.

Create the topics using the kafka-topics.sh command that comes with the Kafka distribution. We'll refer to the directory where you installed Kafka as $KAFKA_HOME. Run the following commands:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic logerr-dsl
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic server-log-dsl
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic processed-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic summary-access-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic windowed-summary-access-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic summary-payload-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic windowed-summary-payload-log
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic avro-topic

Run the Application!

Now run the application as follows:

$ sbt
> clean
> compile
> dsl

This will start the application. Now you can query on the global state using curl:

$ ## The example application has a timer to `touch` the files in the watched
$ ## directory 1 minute after the app starts to trigger the streaming to begin.  Touch 
$ ## the ClarkNet dataset again, or add new files, to stream more entries.
$
$ ## Fetch the number of accesses made to the host world.std.com as per the downloaded 
$ ## data file
$ curl http://localhost:7070/weblog/access/world.std.com
15
$
$ ## If you specify ALL as the key-name then it will fetch a list of all key-values 
$ ## from all the stores that has the access information with the same application id
$ curl http://localhost:7070/weblog/access/ALL
[["204.249.225.59",1],["access9.accsyst.com",2],["cssu24.cs.ust.hk",1],["cyclom1-1-6.intersource.com",1],["d24-1.cpe.Brisbane.aone.net.au",1],["er6.rutgers.edu",1],["world.std.com",3]]
$
$ ## If you specify COUNT as the key-name then it will fetch the sum of count of all 
$ ## approximate number of entries from all the stores that has the access information
$ ## with the same application id
$ curl http://localhost:7070/weblog/access/COUNT
7
$ ## Query access counts by a range of keys.  The "from" key must be less than the "to"
$ ## key.  For example, "a.com" < "z.org"
$ curl http://localhost:7070/weblog/access/range/a.com/z.org
[["access9.accsyst.com",4],["cssu24.cs.ust.hk",2],["cyclom1-1-6.intersource.com",2],["d24-1.cpe.Brisbane.aone.net.au",2],["er6.rutgers.edu",2],["reddit.com",2],["world.std.com",6]]
$
$ ## Query a time window for a key.  The "from" and "to" parameters must be represented
$ ## as a milliseconds since epoch long number.  The "from" time must be less than the 
$ ## "to time. Stream elements are windowed using ingest time and not event time.  For 
$ ## example, get all time windows for world.std.com from epoch 0 to current epoch.
$ curl http://localhost:7070/weblog/access/win/world.std.com/0/$(date +%s%3N)
[[1517518200000,6],[1517518260000,3]]
$ ##
$ ## Fetch the number of bytes in the reply for queries to the host 
$ ## world.std.com as per the downloaded data file
$ curl http://localhost:7070/weblog/bytes/world.std.com
124532

Run in Distributed Mode

The http query layer is designed to work even when your application runs in the distributed mode. Running your Kafka Streams application in the distributed mode means that all the instances must have the same application id.

In order to run the application in distributed mode, you need to run an external Kafka and Zookeeper server. Set kafka.localserver to false to enable this setting.

Here are the steps that you need to follow to run the application in distributed mode. We assume here you are running both the instances in the same node with different port numbers. It's fairly easy to scale this on different nodes.

Step 1: Build and configure for distribution

$ sbt
> dslPackage/universal:packageZipTarball

This creates a distribution under a folder <project home>/build.

$ pwd
<project home>
$ cd build/dsl/target/universal
$ ls
dslpackage-0.0.1.tgz
## unpack the distribution
$ tar xvfz dslpackage-0.0.1.tgz
$ cd dslpackage-0.0.1
$ ls
bin	   conf	lib
$ cd conf
$ ls
application.conf	logback.xml
## change the above 2 files based on your requirements.
$ cd ..
$ pwd
<...>/dslpackage-0.0.1

Step 2: Run the first instance of the application

Ensure the following:

  1. Zookeeper and Kafka are running
  2. All topics mentioned above are created with more than 1 partition
  3. The folder mentioned in directoryToWatch in application.conf has the data file
$ pwd
<...>/dslpackage-0.0.1
$ bin/dslpackage

This starts the single instance of the application. After some time you will see data printed in the console regarding the host access information as present from the data file.

In the log file, created under <...>/dslpackage-0.0.1/logs, check if the REST service has started and note the host and port details. It should be something like localhost:7070 (the default setting in application.conf).

Step 3: Run the second instance of the application

If you decide to run multiple instances of the application you may choose to split the dataset into 2 parts and keep them in different folders. Also you need to copy the current distribution in some other folder and start the second instance from there, since you need to run it with changed settings in application.conf. Say we want to copy in a folder named clarknet-2.

$ cp <project home>/build/dsl/target/universal/dslpackage-0.0.1.tgz clarknet-2
$ cd clarknet-2
$ tar xvfz dslpackage-0.0.1.tgz
## unpack the distribution
$ cd dslpackage-0.0.1
$ ls
bin	   conf	lib
$ cd conf
$ ls
application.conf	logback.xml
## change the above 2 files based on your requirements.
$ cd ..
$ pwd
<...>/dslpackage-0.0.1

The following settings need to be changed in application.conf before you can run the second instance:

  1. dcos.kafka.statestoredir - This is the folder where the local state information gets persisted by Kafka streams. This has to be different for every new instance set up.
  2. dcos.kafka.loader.directorytowatch - The data folder because we would like to ingest different data for the 2 instances.
  3. dcos.http.interface and dcos.http.port - The REST service endpoints. If the node is not different then it can be localhost for both.
$ pwd
<...>/dslpackage-0.0.1
$ bin/dslpackage

This will start the second instance. Check the log file to verify that the REST endpoints are properly started.

Step 4: Do query

The idea of a distributed interactive query interface is to allow the user to query for all keys using any of the end points where the REST service are running. Assume that the 2 instances are running at localhost:7070 and localhost:7071.

Here are a few examples:

## world.std.com was loaded by the first instance of the app
## Query using the end points corresponding to the first instance gives correct result
$ curl localhost:7070/weblog/access/world.std.com
14

## we get correct result even if we query using the end points of of the second instance
$ curl localhost:7071/weblog/access/world.std.com
14

## ppp19.glas.apc.org was loaded by the second instance of the app
## Query using the end points corresponding to the first instance also gives correct result
$ curl localhost:7070/weblog/access/ppp19.glas.apc.org
17

Step 5: Clean up application's Kafka Streams internal topics

When running in distributed mode, Kafka Streams event stores are backed by internal Kafka Streams topics so that state can be restored on different instances of the app if there's a failure. To reset to a clean state you can use the Kafka Streams Application Reset tool This will delete internal Kafka Streams topics associated with a specified application id. Note that you must have delete.topics.enable set to true in your Broker configuration to delete topics.

An example of run this tool:

$ ./kafka-streams-application-reset.sh \
--application-id kstream-weblog-processing \
--bootstrap-servers kafka-0-broker:9092 \
--zookeeper localhost:2181
No input or intermediate topics specified. Skipping seek.
Deleting all internal/auto-created topics for application kstream-weblog-processing
Topic kstream-weblog-processing-windowed-access-count-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-windowed-payload-size-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-access-count-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-payload-size-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-windowed-payload-size-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-windowed-access-count-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-payload-size-per-host-changelog is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Topic kstream-weblog-processing-access-count-per-host-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Done.