Collection of common Flink usage and utilities. At the moment, there are only the following jobs:
- CkanDatasetImporter: shows how to generate a Flink DataSet from a CKAN catalog (and resource)
- Csv2RowExample: shows how to generate a Flink DataSet from a CSV file, using
- flink-table-api : doesn't hangle properly string fields containing double quoted tokens (see https://issues.apache.org/jira/browse/FLINK-4785)
- apache commons-csv : reads all fields as string
- ElastisearchHelper: shows how to create elasticsearch index templates and index mappings, allowing
- number of shards configuration: needed in most cases (see https://issues.apache.org/jira/browse/FLINK-4491)
- number of replicas configuration
- stop words, filter and mappings configuration
- KafkaFlinkAvroParquet: shows how to integrate kafka, flink, avro and parquet. In particular
- AvroDeserializationSchema: deserialize a object to byte[]
- AvroSerializationSchema: serialize the deserialized byte[] to a object
- deserialized object are passed through a Kafka queue
- JsonStringToPojo: Read json file and deserialize to Pojo
- Pojo2JsonString: Serialize Pojo to json and write to file
- EventCountJobLegacy: Queryable state (Deprecated API)
- EventCountJobNew: Queryable state (New API)
To set up the Kafka testing environment download the release and un-tar it:
> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0
Kafka runs over ZooKeeper so first start the ZooKeeper server that is packaged with Kafka to run a single-node ZooKeeper instance. The .properties file is already configured in order to start the ZooKeeper server on port 2181:
> bin/zookeeper-server-start.sh config/zookeeper.properties
Now is possible to run the Kafka server (broker) that will start on port 9092:
> bin/kafka-server-start.sh config/server.properties
In order to start communicating a new topic have to be created:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
The example works only with a topic named "test", ZooKeeper on port 2181 and Kafka on port 9092. If you want to change the the topic name or the ports, remember to modify also the java code:
static String topicId = "test";
static String kafkaPort = "localhost:9092";
static String zkPort = "localhost:2181";
The Producer and the Consumer are automatically managed by the example class that generates, sends and retrieves messages through the kafka queue. Just run the KafkaFlinkAvroParquet class.