A collection of examples demonstrating Apache Flink™'s Python API (PyFlink), updated to use modern APIs and run within a self-contained Docker environment.
These examples primarily use the PyFlink Table API, showcasing common patterns for batch processing.
Running PyFlink applications typically requires a Java runtime environment, Python, and specific dependencies (like PyFlink itself) to be available to the Flink cluster nodes. To simplify setup and ensure consistency, this project uses Docker Compose to manage a local Flink cluster built from a custom Dockerfile.
Here's the workflow:
- You use 
docker compose up --build(implicitly handled bymake start-flinkif the image doesn't exist) to build a custom Flink Docker image based on the includedDockerfile. This image installs Python 3 and the necessary Python packages (apache-flink,numpy) on top of the official Flink image. - Docker Compose then starts Flink JobManager and TaskManager containers using this custom image.
 - The project directory is mounted as a volume inside these containers, making your Python scripts accessible to Flink.
 - You use 
make runwhich executesflink runcommands inside the JobManager container. - The 
flink runcommand submits your Python scripts (.pyfiles) to the Flink cluster. - Flink executes the Python scripts using the Python 3 environment built into the Docker image, potentially distributing tasks to the TaskManager(s).
 - Any output printed by the Python scripts (like results) appears in the standard output logs of the Flink TaskManager containers.
 
This approach ensures the correct Java, Python, and Python dependencies are present and avoids configuration issues related to finding the Python executable.
- Docker and Docker Compose
 - Python 3.6+ (Optional, for local utilities if desired)
 - uv (Optional, for local setup)
 - Homebrew (Optional, on macOS, used by the Makefile to install 
uvif not found) 
Dockerfile:- Starts from the official 
flink:1.19.0-scala_2.12-java11image. - Installs 
python3andpython3-pipusingapt-get. - Copies 
requirements.txt. - Installs Python dependencies (
apache-flink,numpy) usingpip3. 
- Starts from the official 
 docker-compose.yml:- Defines 
jobmanagerandtaskmanagerservices. - Uses 
build: .to instruct Docker Compose to build the image using theDockerfilein the current directory. - Exposes port 
8081for the Flink Web UI. - Sets basic Flink configuration.
 - Mounts the current project directory (
.) to/opt/flink/usrlibinside the containers. - Connects the services via a 
flink-network. 
- Defines 
 
- 
Clone the repository:
git clone https://github.com/wdm0006/flink-python-examples.git cd flink-python-examples - 
(Optional) Set up local Python environment: This step is optional for just running the examples via Docker.
make setup
This uses
uvto create a.venvand install Python dependencies locally. - 
Build the Image and Start the Flink Cluster: The first time you run this, Docker Compose will build the image defined in the
Dockerfile. Subsequent runs will reuse the existing image unless theDockerfileor its context changes.make start-flink # Or directly: docker compose up -d --buildYou can access the Flink Web UI at http://localhost:8081.
 
With the Flink cluster running, submit the example jobs:
make runThis command uses docker compose exec jobmanager flink run -py <script_path_in_container> for each example script. Flink uses the Python 3 environment built into the Docker image.
Output: Check the Flink Web UI (http://localhost:8081) for running/completed jobs. View the stdout logs of the TaskManager(s) to see printed results.
To submit a single example (e.g., word count):
make submit_word_countmake stop-flink
# Or directly: docker compose down- Word Count: Counts word occurrences in a predefined string.
 - Trending Hashtags: Generates sample "tweets", extracts hashtags, and counts their frequency.
 - Data Enrichment: Reads sample JSON data and a CSV dimension table, joins them based on an attribute, and outputs the enriched data.
 - Mean Values: Generates sample floating-point data and calculates the mean of each column.
 - Mandelbrot Set: Generates candidate complex numbers and identifies points within the Mandelbrot set.
 - Template Example: A basic skeleton (
template_example/application.py) demonstrating the structure for a new PyFlink Table API job. 
To remove the local virtual environment (if created) and stop/remove the Flink cluster containers:
make cleanApache®, Apache Flink™, Flink™, and the Apache feather logo are trademarks of The Apache Software Foundation.