Kafka Standalone Consumer [Indexer] will read messages from Kafka, in batches, process and bulk-index them into ElasticSearch.
-
Download the code into a
$INDEXER_HOME
dir. -
$INDEXER_HOME
/src/main/resources/config/kafka-es-indexer.properties file - update all relevant properties as explained in the comments. -
$INDEXER_HOME
/src/main/resources/config/logback.xml - specify directory you want to store logs in:<property name="LOG_DIR" value="/tmp"/>
. Adjust values of max sizes and number of log files as needed. -
$INDEXER_HOME
/src/main/resources/config/kafka-es-indexer-start-options.config - consumer start options can be configured here (Start from earliest, latest, etc), more details inside a file. -
modify
$INDEXER_HOME
/src/main/resources/spring/kafka-es-context-public.xml if neededIf you want to use custom IMessageHandler class - specify it in the following config: (make sure to only modify the class name, not the beans' name/scope)
<bean id="messageHandler" class="org.elasticsearch.kafka.indexer.service.impl.examples.SimpleMessageHandlerImpl" scope="prototype"/>
-
build the app:
cd $INDEXER_HOME
./gradlew clean jar
The kafka-elasticsearch-consumer-0.0.2.0.jar will be created in the
$INDEXER_HOME/build/libs/
dir. -
make sure your
$JAVA_HOME
env variable is set (use JDK1.8 or above); you may want to adjust JVM options and other values in thegradlew
script andgradle.properties
file -
run the app:
./gradlew run -Dindexer.properties=$INDEXER_HOME/src/main/resources/config/kafka-es-indexer.properties -Dlogback.configurationFile=$INDEXER_HOME/src/main/resources/config/logback.xml
-
Steps 1 - 6 are the same
-
run:
./gradlew clean installDist
-
run
./kafka-elasticsearch-consumer -Dindexer.properties=$INDEXER_HOME/src/main/resources/config/kafka-es-indexer.properties -Dlogback.configurationFile=$INDEXER_HOME/src/main/resources/config/logback.xml
script
-
Kafka Version: 1.0.x
-
ElasticSearch: 5.5.x
-
JDK 1.8
Indexer application properties are specified in the kafka-es-indexer.properties file - you have to adjust properties for your env:
kafka-es-indexer.properties.
You can specify you own properties file via -Dindexer.properties=/abs-path/your-kafka-es-indexer.properties
Logging properties are specified in the logback.xml file - you have to adjust properties for your env:
logback.xml.
You can specify your own logback config file via -Dlogback.configurationFile=/abs-path/your-logback.xml
property
Indexer application Spring configuration is specified in the kafka-es-context-public.xml: kafka-es-context.xml
Consumer start options configuration file is specified in kafka-es-indexer-start-options.config - by default RESTART
option is used for all partitions:
kafka-es-indexer-start-options.config.
You can specify you own configuration file via -Doffsets.config.path=/abs-path/your-kafka-es-indexer-start-options.config
Indexer application can be easily customized. The main areas for customizations are:
- message handling/conversion
examples of use cases for this customization:
- your incoming messages are not in a JSON format compatible with the expected ES message formats
- your messages have to be enreached with data from other sources (via other meta-data lookups, etc.)
- you want to selectively index messages into ES based on some custom criteria
- index name/type customization
Message handling can be customized by implementing the IMessageHandler interface :
org.elasticsearch.kafka.indexer.service.IMessageHandler
is an interface that defines main methods for reading events from Kafka, processing them, and bulk-intexing into ElasticSearch. One can implement all or some of the methods if custom behavior is needed. You can customize:transformMessage(...)
method to transform an event from one format into another;addEventToBatch(...)
method - adding an event to specified (or custom ) index, with or without routing infopostToElasticSearch(...)
method - most likely you won't need to customize this
To do this customization, you can implement the IMessageHandler interface and inject the ElasticSearchBatchService
into your implementation class and delegate most of the methods to the ElasticSearchBatchService class. ElasticSearchBatchService gives you basic batching operations.
See org.elasticsearch.kafka.indexer.service.impl.examples.SimpleMessageHandlerImpl
for an example of such customization.
- Don't forget to specify your custom message handler class in the kafka-es-context-public.xml file. By default, SimpleMessageHandlerImpl will be used
Index name and index type management/determination customization can be done by providing custom logic in your implementation of the IMessageHandler interface:
org.elasticsearch.kafka.indexer.service.impl.examples.SimpleMessageHandlerImpl
useselasticsearch.index.name
andelasticsearch.index.type
values as configured in the kafka-es-indexer.properties file. If you want to use custom logic - add it to theaddEventToBatch(...)
method
TODO
kafka-elasticsearch-standalone-consumer
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.