(also provided Traditional Chinese version document README-CH.md.)
Data pipeline written by Flink to transfer Kafka to Kafka, Doris and MongoDB, and also merge the two data sources.
- Platform: JDK 11
- Build Tool: Apache Maven v3.9.6
- Data Processing Framework: Flink v1.18.1
mvn clean package
docker compose build
edit YourJavaClass to Class you want to run
docker compose run --rm -e MY_CLASS=YourJavaClass myFlinkJob
Transfer all messages from topic-source in Kafka (localhost:9092) to topic-sink in Kafka (localhost:9092).
Backed up all topics matching ^topicV.* in Kafka (localhost:9092) to the same topics in Kafka (localhost:9093), Kafka(localhost:9094), Kafka(localhost:9095).
Split the data array/list in topic-sensor in Kafka (localhost:9092) and insert it into the Doris (localhost:9030) database database.sensor.
- Kafka Topic
topic-sensorMessage
{
"location": "Area A",
"timestamp": "2024-03-25T08:00:00",
"data": [
{
"sensorId": "sensor001",
"sensorType": "Temperature",
"value": 25.5,
"unit": "Celsius"
},
{
"sensorId": "sensor002",
"sensorType": "Humidity",
"value": 60.2,
"unit": "%"
}
]
}- Doris Table
database.sensor
| id | type | location | timestamp | value | unit |
|-----------|---------------|-------------|---------------------|-------|---------|
| sensor001 | Temperature | Area A | 2024-03-25T08:00:00 | 25.5 | Celsius |
| sensor002 | Humidity | Area A | 2024-03-25T08:00:00 | 60.2 | % |
Convert the data from the Doris (localhost:9030) database database.sensor into an array/list named data and transfer it to topic-sensor in Kafka (localhost:9092).
- Doris Table
database.sensor
| id | type | location | timestamp | value | unit |
|-----------|---------------|-------------|---------------------|-------|---------|
| sensor001 | Temperature | Area A | 2024-03-25T08:00:00 | 25.5 | Celsius |
| sensor002 | Humidity | Area A | 2024-03-25T08:00:00 | 60.2 | % |
- Kafka Topic
topic-sensorMessage
{
"location": "Area A",
"timestamp": "2024-03-25T08:00:00",
"data": [
{
"sensorId": "sensor001",
"sensorType": "Temperature",
"value": 25.5,
"unit": "Celsius"
}
]
}Break down the data array/list from topic-sensor in Kafka (localhost:9092) and combine it with the equipment and sensor settings from topic-setting. Then, transfer the resulting data into the Doris (localhost:9030) database database.monitoring_data.
- Kafka Topic
topic-sensorMessage
{
"location": "Area A",
"timestamp": "2024-03-25T08:00:00",
"data": [
{
"sensorId": "sensor001",
"sensorType": "Temperature",
"value": 25.5,
"unit": "Celsius"
},
{
"sensorId": "sensor002",
"sensorType": "Humidity",
"value": 60.2,
"unit": "%"
}
]
}- Kafka Topic
topic-settingMessage
{
"equipments": [
{
"id": "equipment001",
"name": "機器1",
"location": "Area A"
}
],
"sensors": [
{
"id": "sensor001",
"equipments": ["equipment001", "equipment002"]
},
{
"id": "sensor002",
"equipments": ["equipment001", "equipment003"]
}
]
}- Doris Table
database.monitoring_data
| equipment_id | sensor_id | sensor_type | sensor_timestamp | sensor_value | sensor_unit |
|---------------|-----------|---------------|-----------------------|--------------|--------------|
| equipment001 | sensor001 | Temperature | 2024-05-02T08:00:00 | 25.5 | Celsius |
| equipment001 | sensor002 | Humidity | 2024-05-02T08:00:00 | 60.2 | % |
Transfer message in topic in Kafka (localhost:9092) to MongoDB (localhost:27017) database.collection