diff --git a/application/apache-kafka-flink-streaming/.env b/application/apache-kafka-flink-streaming/.env new file mode 100644 index 00000000..845309d6 --- /dev/null +++ b/application/apache-kafka-flink-streaming/.env @@ -0,0 +1,20 @@ +CRATEDB_HOST=crate +CRATEDB_PORT=4200 +CRATEDB_PG_PORT=5432 + +WEATHER_PRODUCER_CITY=Vienna +WEATHER_PRODUCER_API_KEY=#GET THE API KEY - https://www.weatherapi.com/ +WEATHER_PRODUCER_FETCH_EVERY_SECONDS=30 +WEATHER_PRODUCER_KAFKA_TOPIC=weather_topic +WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER=kafka + +FLINK_CONSUMER_KAFKA_TOPIC=weather_topic +FLINK_CONSUMER_BOOTSTRAP_SERVER=kafka +FLINK_CONSUMER_CRATEDB_PG_URI=jdbc:postgresql://crate:5432/crate +FLINK_CONSUMER_CRATEDB_USER=crate +FLINK_CONSUMER_CRATEDB_PASSWORD=empty + +# Jar versions. +POSTGRESQL_JAR_VERSION=42.7.2 +FLINK_CONNECTOR_JDBC_VERSION=3.1.2-1.18 +FLINK_KAFKA_JAR_URL_VERSION=3.1.0-1.18 \ No newline at end of file diff --git a/application/apache-kafka-flink-streaming/Dockerfile b/application/apache-kafka-flink-streaming/Dockerfile new file mode 100644 index 00000000..0f1b1543 --- /dev/null +++ b/application/apache-kafka-flink-streaming/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.10 + +WORKDIR /app +COPY * /app + +RUN pip install poetry +RUN poetry config virtualenvs.create false && poetry install diff --git a/application/apache-kafka-flink-streaming/README.md b/application/apache-kafka-flink-streaming/README.md new file mode 100644 index 00000000..580e8426 --- /dev/null +++ b/application/apache-kafka-flink-streaming/README.md @@ -0,0 +1,142 @@ +# Streaming data with Apache Kafka, Apache Flink and CrateDB. + +## About + +This example showcases what a data-streaming architecture leveraging Kafka and Flink could look +like. + +We use. + +- Kafka (confluent) +- Apache Flink +- CrateDB +- Python >=3.7<=3.11 + +## Overview + +An HTTP call is scheduled to run every 60 seconds on `weather_producer`, the API returns a JSON +with the specified city's weather, the json is then sent through `Kafka`. + +`flink_consumer` is a flink application consuming the same kafka topic; +upon receiving data, it sends the resulting datastream to the sink, which is CrateDB. + +Both `flink_consumer` and `weather_producer` are written using their respective Python Wrappers. + +[kafka-python](https://kafka-python.readthedocs.io/en/master/) + +[apache-flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/overview/) + +Everything is customizable via environment variables, the API schedule, the topic, credentials... +etc. + +See `.env` for more details. + +## How to use + +The Docker Compose configuration will get you started quickly. +You will need to fill in the API key of [Weather API](https://www.weatherapi.com/) +into your local `.env` file. + +### Run the docker compose (and build the images) + +``` +docker compose up -d --build +``` + +### Stop the docker compose + +``` +docker compose down +``` + +### Poetry + +``` +poetry install +``` + +### Pip + +``` +pip install -r requirements.txt +``` + +## Notes + +### CrateDB initial settings. + +CrateDB stores the shard indexes on the file system by mapping a file into memory (mmap) +You might need to set `max_map_count` to something higher than the usual default, like `262144`. + +You can do it by running `sysctl -w vm.max_map_count=262144`, +for more information see: [this](https://cratedb.com/docs/guide/admin/bootstrap-checks.html#linux) + +### Mock API call. + +If you don't want to register in the weather api we use, you can use the +provided function `mock_fetch_weather_data`, call this instead in the scheduler call. + +This is how it would look like: + +```python +scheduler.enter( + RUN_EVERY_SECONDS, + 1, + schedule_every, + (RUN_EVERY_SECONDS, mock_fetch_weather_data, scheduler) +) +``` + +*After changing this, re-build the docker compose.* + +### Initial kafka topic. + +In this example the `Kafka` topic is only initialized the first data is sent to it, because of this +the flink job could fail if it exceeds the default timeout (60) seconds, this might only happen +if the API takes too long to respond *the very first time this project*. + +To solve this, you should [configure](https://kafka.apache.org/quickstart#quickstart_createtopic) +the +topics at boot time. This is recommended for production scenarios. + +If you are just testing things around, you can solve this by re-running `docker compose up -d`, it +will only start `flink_job` and assuming everything went ok, the topic should already exist and +work as expected. + +If it still fails, check if any other container/service is down, +it could be a symptom of a wrong api token or an unresponsive Kafka server, for example. + +## Data and schema + +See `example.json` for the schema, as you can see in `weather_producer` and `flink_consumer`, schema +manipulation is minimum, +thanks to CrateDB's dynamic objects we only need to map `location` and `current` keys. + +For more information on dynamic objects +see: [this](https://cratedb.com/blog/handling-dynamic-objects-in-cratedb) + +In `weather_producer` the `Kafka` producer directly serializes the json into a string. + +```python +KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER, + value_serializer=lambda m: json.dumps(m).encode('utf-8')) +``` + +In `flink_consumer` we use a `JSON` serializer and only specify the two main keys, +`location` and `current` + +```python +row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()]) +json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build() +``` + +If your format is not json, or if you want to specify the whole schema, adapt it as needed. + +[Here](https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html) +you can find example of other formats like `csv` or `avro`. + +## Jars and versions. + +Jars are downloaded at build time to /app/jars, versions are pinned in the .env + +There is a `JARS_PATH` in `flink_consumer`, change it if you have the jars somewhere else. diff --git a/application/apache-kafka-flink-streaming/docker-compose.yml b/application/apache-kafka-flink-streaming/docker-compose.yml new file mode 100644 index 00000000..d3933589 --- /dev/null +++ b/application/apache-kafka-flink-streaming/docker-compose.yml @@ -0,0 +1,58 @@ +services: + weather_producer: + env_file: + - .env + build: + context: . + dockerfile: Dockerfile + command: python -m weather_producer + depends_on: + - kafka + + flink_job: + env_file: + - .env + build: + context: . + dockerfile: flink_job.Dockerfile + args: + - POSTGRESQL_JAR_URL=jdbc.postgresql.org/download/postgresql-${POSTGRESQL_JAR_VERSION}.jar + - FLINK_SQL_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/${FLINK_CONNECTOR_JDBC_VERSION}/flink-connector-jdbc-${FLINK_CONNECTOR_JDBC_VERSION}.jar + - FLINK_KAFKA_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/${FLINK_KAFKA_JAR_URL_VERSION}/flink-sql-connector-kafka-${FLINK_KAFKA_JAR_URL_VERSION}.jar + command: python -m flink_consumer + depends_on: + - kafka + + crate: + image: crate:latest + ports: + - "4200:4200" + command: [ "crate", + "-Cdiscovery.type=single-node", + ] + environment: + - CRATE_HEAP_SIZE=2g + + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + hostname: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-server:6.2.0 + depends_on: + - zookeeper + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/application/apache-kafka-flink-streaming/example.json b/application/apache-kafka-flink-streaming/example.json new file mode 100644 index 00000000..57a82e91 --- /dev/null +++ b/application/apache-kafka-flink-streaming/example.json @@ -0,0 +1,41 @@ +{ + "location": { + "localtime": "2024-03-07 18:20", + "country": "France", + "localtime_epoch": 1709832024, + "name": "Nonette", + "lon": 3.28, + "region": "Auvergne", + "lat": 45.48, + "tz_id": "Europe/Paris" + }, + "current": { + "feelslike_c": 11, + "uv": 3, + "last_updated": "2024-03-07 18:15", + "feelslike_f": 51.7, + "wind_degree": 30, + "last_updated_epoch": 1709831700, + "is_day": 1, + "precip_in": 0, + "wind_dir": "NNE", + "gust_mph": 12.1, + "temp_c": 12, + "pressure_in": 29.83, + "gust_kph": 19.5, + "temp_f": 53.6, + "precip_mm": 0, + "cloud": 0, + "wind_kph": 6.8, + "condition": { + "code": 1000, + "icon": "//cdn.weatherapi.com/weather/64x64/day/113.png", + "text": "Sunny" + }, + "wind_mph": 4.3, + "vis_km": 10, + "humidity": 50, + "pressure_mb": 1010, + "vis_miles": 6 + } +} \ No newline at end of file diff --git a/application/apache-kafka-flink-streaming/flink_consumer.py b/application/apache-kafka-flink-streaming/flink_consumer.py new file mode 100644 index 00000000..9406177c --- /dev/null +++ b/application/apache-kafka-flink-streaming/flink_consumer.py @@ -0,0 +1,67 @@ +import os +import logging + +from pathlib import Path + +from pyflink.common import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions +from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer +from pyflink.datastream.formats.json import JsonRowDeserializationSchema + +logging.basicConfig( + format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s', + level=logging.DEBUG +) + +JARS_PATH = Path(__file__).parent / 'jars' + +KAFKA_BOOTSTRAP_SERVER = os.getenv('FLINK_CONSUMER_BOOTSTRAP_SERVER') +KAFKA_TOPIC = os.getenv('FLINK_CONSUMER_KAFKA_TOPIC') +CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', 'jdbc:postgresql://localhost:5432/crate') +CRATEDB_USER = os.getenv('FLINK_CONSUMER_CRATEDB_USER') +CRATEDB_PASSWORD = os.getenv('FLINK_CONSUMER_CRATEDB_PASSWORD') + + +def kafka_to_cratedb(env: StreamExecutionEnvironment): + row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()]) + json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build() + + # Consumes data from Kafka. + kafka_consumer = FlinkKafkaConsumer( + topics=KAFKA_TOPIC, + deserialization_schema=json_format, + properties={'bootstrap.servers': f'{KAFKA_BOOTSTRAP_SERVER}:9092'} + ) + kafka_consumer.set_start_from_latest() + + ds = env.add_source(kafka_consumer, source_name='kafka') + + # Writes data to cratedb. + ds.add_sink( + JdbcSink.sink( + "insert into doc.weather_flink_sink (location, current) values (?, ?)", + row_type_info, + JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .with_url(CRATEDB_PG_URI) + .with_driver_name('org.postgresql.Driver') + .with_user_name(CRATEDB_USER) + .with_password(CRATEDB_PASSWORD) + .build(), + JdbcExecutionOptions.builder() + .with_batch_interval_ms(1000) + .with_batch_size(200) + .with_max_retries(5) + .build() + ) + ) + env.execute() + + +if __name__ == '__main__': + env = StreamExecutionEnvironment.get_execution_environment() + jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar')))) + env.add_jars(*jars) + + logging.info("Reading data from kafka") + kafka_to_cratedb(env) diff --git a/application/apache-kafka-flink-streaming/flink_job.Dockerfile b/application/apache-kafka-flink-streaming/flink_job.Dockerfile new file mode 100644 index 00000000..53ad99fd --- /dev/null +++ b/application/apache-kafka-flink-streaming/flink_job.Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.10 +# Python version is important, because as of today (2024-03-04) kafka-flink is only +# supported for python<=3.10 + +ARG POSTGRESQL_JAR_URL +ARG FLINK_SQL_JAR_URL +ARG FLINK_KAFKA_JAR_URL + +WORKDIR /app +COPY * /app +RUN wget ${POSTGRESQL_JAR_URL} --directory-prefix=/app/jars +RUN wget ${FLINK_SQL_JAR_URL} --directory-prefix=/app/jars +RUN wget ${FLINK_KAFKA_JAR_URL} --directory-prefix=/app/jars + +RUN apt update && apt install -y openjdk-11-jdk +RUN pip install poetry + +RUN poetry config virtualenvs.create false && poetry install diff --git a/application/apache-kafka-flink-streaming/pyproject.toml b/application/apache-kafka-flink-streaming/pyproject.toml new file mode 100644 index 00000000..15b7507d --- /dev/null +++ b/application/apache-kafka-flink-streaming/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "cratedb-weather-data" +version = "0.1.0" +description = "" +authors = ["ivan.sanchez "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.9" +requests = "^2.31.0" +kafka-python = "^2.0.2" +apache-flink = "^1.18.1" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/application/apache-kafka-flink-streaming/requirements.txt b/application/apache-kafka-flink-streaming/requirements.txt new file mode 100644 index 00000000..b19c7ae6 --- /dev/null +++ b/application/apache-kafka-flink-streaming/requirements.txt @@ -0,0 +1,44 @@ +apache-beam==2.48.0 +apache-flink==1.18.1 +apache-flink-libraries==1.18.1 +avro-python3==1.10.2 +certifi==2024.2.2 +charset-normalizer==3.3.2 +cloudpickle==2.2.1 +crate==0.34.0 +crcmod==1.7 +dill==0.3.1.1 +dnspython==2.6.1 +docopt==0.6.2 +fastavro==1.9.4 +fasteners==0.19 +find-libpython==0.3.1 +geojson==3.1.0 +greenlet==3.0.3 +grpcio==1.62.0 +hdfs==2.7.3 +httplib2==0.22.0 +idna==3.6 +kafka-python==2.0.2 +numpy==1.24.4 +objsize==0.6.1 +orjson==3.9.15 +pandas==2.2.1 +pemja==0.3.0 +proto-plus==1.23.0 +protobuf==4.23.4 +py4j==0.10.9.7 +pyarrow==11.0.0 +pydot==1.4.2 +pymongo==4.6.2 +pyparsing==3.1.2 +python-dateutil==2.9.0.post0 +pytz==2024.1 +regex==2023.12.25 +requests==2.31.0 +six==1.16.0 +SQLAlchemy==2.0.28 +typing_extensions==4.10.0 +tzdata==2024.1 +urllib3==2.0.7 +zstandard==0.22.0 diff --git a/application/apache-kafka-flink-streaming/weather_producer.py b/application/apache-kafka-flink-streaming/weather_producer.py new file mode 100644 index 00000000..95141423 --- /dev/null +++ b/application/apache-kafka-flink-streaming/weather_producer.py @@ -0,0 +1,74 @@ +import json +import logging +import os +import sched +import time +import functools + +import requests + +from kafka import KafkaProducer + +logging.basicConfig( + format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s', + level=logging.DEBUG +) +logger = logging.getLogger(__name__) + +CRATEDB_HOST = os.getenv('CRATEDB_HOST', 'crate') + os.getenv('CRATEDB_PORT', '4200') +API_KEY = os.getenv('WEATHER_PRODUCER_API_KEY') +CITY = os.getenv('WEATHER_PRODUCER_CITY') + +WEATHER_URI = f'https://api.weatherapi.com/v1/current.json?key={API_KEY}&q={CITY}&aqi=no' + +RUN_EVERY_SECONDS = int(os.getenv('WEATHER_PRODUCER_FETCH_EVERY_SECONDS', 5)) +BOOTSTRAP_SERVER = os.getenv('WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER') +KAFKA_TOPIC = os.getenv('WEATHER_PRODUCER_KAFKA_TOPIC') + +producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER, + value_serializer=lambda m: json.dumps(m).encode('utf-8')) + + +@functools.cache +def mocked_fetch_weather_data(): + with open('example.json') as f: + return json.loads(f.read()) + + +def fetch_weather_data(api_uri) -> dict: + response = requests.get(api_uri) + response.raise_for_status() + return response.json() + + +def send_to_kafka(topic: str, producer) -> None: + data = fetch_weather_data(WEATHER_URI) + producer.send(topic, value=data) + producer.flush() + + +def schedule_every(seconds, func, scheduler): + # schedule the next call first + scheduler.enter(seconds, 1, schedule_every, (RUN_EVERY_SECONDS, func, scheduler)) + func() + + +def main(): + try: + produce_weather_data = functools.partial(send_to_kafka, KAFKA_TOPIC, producer) + logger.debug(f'Starting scheduler, will run every {RUN_EVERY_SECONDS}(s)') + scheduler = sched.scheduler(time.time, time.sleep) + scheduler.enter( + RUN_EVERY_SECONDS, + 1, + schedule_every, + (RUN_EVERY_SECONDS, produce_weather_data, scheduler) + ) + scheduler.run() + + except KeyboardInterrupt: + logger.info('Exit: KeyboardInterrupt') + + +if __name__ == '__main__': + main()