Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions FlinkDotNet/FlinkDotNet.JobGateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@ WORKDIR /app
EXPOSE 8080

FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build

# Install Java and Maven for FlinkIRRunner build
RUN apt-get update && \
apt-get install -y openjdk-17-jdk maven && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME environment variable
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
ENV PATH="${JAVA_HOME}/bin:${PATH}"

WORKDIR /src

# Copy project files
COPY ["FlinkDotNet/FlinkDotNet.JobGateway/FlinkDotNet.JobGateway.csproj", "FlinkDotNet/FlinkDotNet.JobGateway/"]
COPY ["FlinkDotNet/Flink.JobRunner/Flink.JobRunner.csproj", "FlinkDotNet/Flink.JobRunner/"]
COPY ["FlinkDotNet/Flink.JobBuilder/Flink.JobBuilder.csproj", "FlinkDotNet/Flink.JobBuilder/"]

# Restore dependencies
RUN dotnet restore "FlinkDotNet/FlinkDotNet.JobGateway/FlinkDotNet.JobGateway.csproj"

# Copy source code
# Copy source code (including FlinkIRRunner for Java build)
COPY FlinkDotNet/ FlinkDotNet/
COPY FlinkIRRunner/ FlinkIRRunner/

# Build
WORKDIR "/src/FlinkDotNet/FlinkDotNet.JobGateway"
Expand All @@ -26,9 +37,24 @@ FROM build AS publish
RUN dotnet publish "FlinkDotNet.JobGateway.csproj" -c Release -o /app/publish /p:UseAppHost=false

FROM base AS final

# Install Java and Maven for on-demand JAR building capability
# This must be done before switching to non-root user
RUN apt-get update && \
apt-get install -y openjdk-17-jdk maven && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME environment variable
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
ENV PATH="${JAVA_HOME}/bin:${PATH}"

WORKDIR /app
COPY --from=publish /app/publish .

# Copy FlinkIRRunner source for on-demand building
COPY --from=build /src/FlinkIRRunner /app/FlinkIRRunner

# Run as non-root user for security
USER app

Expand Down
98 changes: 98 additions & 0 deletions ReleasePackagesTesting.Published/NativeFlinkJob/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Native Flink Kafka Job - Infrastructure Validation

This is a standalone Apache Flink job using the official Flink Kafka connector to validate that the Aspire LocalTesting infrastructure is correctly configured.

## Purpose

Before debugging Gateway/IR issues, we need to prove the infrastructure works with a standard Flink job:
- ✅ Aspire DCP correctly configures Flink cluster
- ✅ Kafka is accessible from Flink containers at `kafka:9093`
- ✅ Messages flow through: Kafka Input → Flink Transform → Kafka Output

## Build

```bash
cd LocalTesting/NativeFlinkJob
mvn clean package
```

This creates: `target/native-flink-kafka-job-1.0.0.jar`

## Run via Flink REST API

```bash
# Upload JAR
curl -X POST -H "Expect:" -F "jarfile=@target/native-flink-kafka-job-1.0.0.jar" \
http://localhost:8081/jars/upload

# Submit job (replace {jarId} with the ID from upload response)
curl -X POST http://localhost:8081/jars/{jarId}/run \
-H "Content-Type: application/json" \
-d '{
"entryClass": "com.flinkdotnet.NativeKafkaJob",
"programArgsList": [
"--bootstrap-servers", "kafka:9093",
"--input-topic", "lt.native.input",
"--output-topic", "lt.native.output",
"--group-id", "native-test-consumer"
],
"parallelism": 1
}'
```

## Test with C#

The `FlinkNativeKafkaInfrastructureTest.cs` integration test:
1. Starts Aspire infrastructure (Kafka + Flink)
2. Builds and submits this native JAR
3. Produces test messages
4. Verifies messages are transformed and consumed

If this test **PASSES**: Infrastructure is correct, debug Gateway
If this test **FAILS**: Fix infrastructure first

## Configuration

Default values (for LocalTesting environment):
- **Bootstrap Servers**: `kafka:9093` (Aspire DCP internal listener)
- **Input Topic**: `lt.native.input`
- **Output Topic**: `lt.native.output`
- **Group ID**: `native-flink-consumer`

Override with command-line args:
```bash
--bootstrap-servers kafka:9093
--input-topic my-input
--output-topic my-output
--group-id my-consumer-group
```

## Key Differences from FlinkJobRunner

1. **Uses official Flink Kafka Connector** (`flink-connector-kafka`) not raw Kafka clients
2. **Proper dependency management** - connector packaged in fat JAR
3. **Standard Flink APIs** - `KafkaSource` and `KafkaSink` builders
4. **No IR/JSON** - direct Java code, no intermediate representation

## Troubleshooting

**Build fails with missing dependencies**:
- Ensure Maven can reach Maven Central
- Check Flink version compatibility (2.1.0)

**Job fails to start**:
- Check Flink JobManager logs: `docker logs flink-jobmanager`
- Verify bootstrap servers are accessible from Flink container

**No messages consumed**:
- Check Kafka topics exist
- Verify bootstrap servers (`kafka:9093` for containers, `localhost:{port}` for host)
- Check Flink job is in RUNNING state
- Look for exceptions in TaskManager logs

## Next Steps After Validation

Once this job works:
1. Compare its Kafka configuration with Gateway's IR-generated config
2. Identify what Gateway does differently
3. Fix Gateway to match working configuration
126 changes: 126 additions & 0 deletions ReleasePackagesTesting.Published/NativeFlinkJob/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.flinkdotnet</groupId>
<artifactId>native-flink-kafka-job</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Native Flink Kafka Job</name>
<description>Native Apache Flink job to validate infrastructure setup</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>2.1.0</flink.version>
<kafka.version>3.7.0</kafka.version>
<slf4j.version>1.7.36</slf4j.version>
</properties>

<dependencies>
<!-- Flink Core Dependencies (provided by cluster) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink Connector Base (required for Flink 2.x) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Kafka Clients (bundled in JAR, same as FlinkJobRunner) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.1</version>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<!-- Shade plugin to create fat JAR with dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- Suppress harmless warnings about overlapping resources -->
<createDependencyReducedPom>true</createDependencyReducedPom>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<!-- Exclude module-info to avoid strong encapsulation warnings -->
<exclude>module-info.class</exclude>
<!-- Only keep one MANIFEST.MF (from this project) -->
<exclude>META-INF/MANIFEST.MF</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- Create new MANIFEST.MF with main class -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.flinkdotnet.NativeKafkaJob</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading