Skip to content

Commit 8d10849

Browse files
committed
adjust to review comments by @Abacn
1 parent d2b8b2f commit 8d10849

File tree

7 files changed

+33
-68
lines changed

7 files changed

+33
-68
lines changed

.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources
3434
sdks/java/maven-archetypes/gcp-bom-examples/src/main/resources/archetype-resources/src/
3535
sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/sample.txt
3636

37-
# Ignore generated debezium data
38-
sdks/java/io/debezium/data/
39-
4037
# Ignore files generated by the Python build process.
4138
**/*.pyc
4239
**/*.pyo

CHANGES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
3232
## I/Os
3333
34-
* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) ([#34747](https://github.com/apache/beam/issues/34747)).
34+
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
3535
3636
## New Features / Improvements
3737
@@ -75,6 +75,7 @@
7575
## I/Os
7676

7777
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
78+
* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) ([#34747](https://github.com/apache/beam/issues/34747)).
7879

7980
## New Features / Improvements
8081
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
@@ -196,7 +197,6 @@
196197
* [Python] Reshuffle now correctly respects user-specified type hints, fixing a previous bug where it might use FastPrimitivesCoder wrongly. This change could break pipelines with incorrect type hints in Reshuffle. If you have issues after upgrading, temporarily set update_compatibility_version to a previous Beam version to use the old behavior. The recommended solution is to fix the type hints in your code. ([#33932](https://github.com/apache/beam/pull/33932))
197198
* [Java] SparkReceiver 2 has been moved to SparkReceiver 3 that supports Spark 3.x. ([#33574](https://github.com/apache/beam/pull/33574))
198199
* [Python] Correct parsing of `collections.abc.Sequence` type hints was added, which can lead to pipelines failing type hint checks that were previously passing erroneously. These issues will be most commonly seen trying to consume a PCollection with a `Sequence` type hint after a GroupByKey or a CoGroupByKey. ([#33999](https://github.com/apache/beam/pull/33999)).
199-
* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final of io.debezium to 3.1.1.Final. This may cause some breaking changes since the libraries do not maintain full compatibility ([#33526](https://github.com/apache/beam/issues/33526)).
200200

201201
## Bugfixes
202202

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -674,8 +674,8 @@ class BeamModulePlugin implements Plugin<Project> {
674674
activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version",
675675
activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version",
676676
activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version",
677-
antlr : "org.antlr:antlr4:4.10",
678-
antlr_runtime : "org.antlr:antlr4-runtime:4.10",
677+
antlr : "org.antlr:antlr4:4.7",
678+
antlr_runtime : "org.antlr:antlr4-runtime:4.7",
679679
args4j : "args4j:args4j:2.33",
680680
auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version",
681681
// TODO: https://github.com/apache/beam/issues/34993 after stopping supporting Java 8

sdks/java/io/debezium/build.gradle

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,13 @@ import groovy.json.JsonOutput
1919

2020
plugins { id 'org.apache.beam.module' }
2121

22-
// The order is intended here - Debezium 3 requires Java 17, or later
23-
// Overwrite javaVersion global property if corresponding project property specified
24-
if (project.hasProperty('java17Home')) {
25-
javaVersion = "1.17"
26-
} else if (project.hasProperty('java21Home')) {
27-
javaVersion = "1.21"
28-
} else if (JavaVersion.VERSION_1_8.compareTo(JavaVersion.current()) < 0) {
29-
// Otherwise, compile the project with java11 spec
30-
javaVersion = "1.17"
31-
}
32-
3322
applyJavaNature(
3423
automaticModuleName: 'org.apache.beam.sdk.io.debezium',
3524
mavenRepositories: [
3625
[id: 'io.confluent', url: 'https://packages.confluent.io/maven/']
3726
],
3827
enableSpotbugs: false,
28+
requireJavaVersion: JavaVersion.VERSION_17,
3929
)
4030
provideIntegrationTestingDependencies()
4131

@@ -53,8 +43,6 @@ dependencies {
5343

5444
// Kafka connect dependencies
5545
implementation "org.apache.kafka:connect-api:3.9.0"
56-
implementation "org.apache.kafka:connect-json:3.9.0"
57-
permitUnusedDeclared "org.apache.kafka:connect-json:3.9.0"
5846

5947
// Debezium dependencies
6048
implementation group: 'io.debezium', name: 'debezium-core', version: '3.1.1.Final'
@@ -80,7 +68,15 @@ dependencies {
8068
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final'
8169
}
8270

71+
// TODO: Remove pin after Beam has unpinned it
72+
// Pin the Antlr version
73+
configurations.all {
74+
resolutionStrategy {
75+
force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10'
76+
}
77+
}
8378

79+
// TODO: Remove pin after upgrading Beam's Jackson version
8480
// Force Jackson versions for the test runtime classpath
8581
configurations.named("testRuntimeClasspath") {
8682
resolutionStrategy.force (
@@ -139,23 +135,6 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
139135
}
140136
}
141137

142-
// The order is intended here - Debezium 3 requires Java 17, or later
143-
if (project.hasProperty('java17Home')) {
144-
project.tasks.withType(JavaCompile) {
145-
setJavaVerOptions(options, '17')
146-
}
147-
project.tasks.withType(Javadoc) {
148-
executable = project.findProperty('java17Home') + '/bin/javadoc'
149-
}
150-
} else if (project.hasProperty('java21Home')) {
151-
project.tasks.withType(JavaCompile) {
152-
setJavaVerOptions(options, '21')
153-
}
154-
project.tasks.withType(Javadoc) {
155-
executable = project.findProperty('java21Home') + '/bin/javadoc'
156-
}
157-
}
158-
159138
project.tasks.each {
160139
it.onlyIf {project.hasProperty('java17Home') || project.hasProperty('java21Home')
161140
|| JavaVersion.VERSION_17.compareTo(JavaVersion.current()) <= 0

sdks/java/io/debezium/expansion-service/build.gradle

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,12 @@ apply plugin: 'org.apache.beam.module'
2020
apply plugin: 'application'
2121
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
2222

23-
// The order is intended here - Debezium 3 requires Java 17, or later
24-
// Overwrite javaVersion global property if corresponding project property specified
25-
if (project.hasProperty('java17Home')) {
26-
javaVersion = "1.17"
27-
} else if (project.hasProperty('java21Home')) {
28-
javaVersion = "1.21"
29-
} else if (JavaVersion.VERSION_1_8.compareTo(JavaVersion.current()) < 0) {
30-
// Otherwise, compile the project with java11 spec
31-
javaVersion = "1.17"
32-
}
33-
3423
applyJavaNature(
3524
automaticModuleName: 'org.apache.beam.sdk.io.debezium.expansion.service',
3625
exportJavadoc: false,
3726
validateShadowJar: false,
3827
shadowClosure: {},
28+
requireJavaVersion: JavaVersion.VERSION_17,
3929
)
4030

4131
description = "Apache Beam :: SDKs :: Java :: IO :: Debezium :: Expansion Service"
@@ -57,23 +47,6 @@ dependencies {
5747
runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: debezium_version
5848
}
5949

60-
// The order is intended here - Debezium 3 requires Java 17, or later
61-
if (project.hasProperty('java17Home')) {
62-
project.tasks.withType(JavaCompile) {
63-
setJavaVerOptions(options, '17')
64-
}
65-
project.tasks.withType(Javadoc) {
66-
executable = project.findProperty('java17Home') + '/bin/javadoc'
67-
}
68-
} else if (project.hasProperty('java21Home')) {
69-
project.tasks.withType(JavaCompile) {
70-
setJavaVerOptions(options, '21')
71-
}
72-
project.tasks.withType(Javadoc) {
73-
executable = project.findProperty('java21Home') + '/bin/javadoc'
74-
}
75-
}
76-
7750
// Module classes requires JDK > 8
7851
project.tasks.each {
7952
it.onlyIf {project.hasProperty('java17Home') || project.hasProperty('java21Home')

sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem;
2222
import static org.hamcrest.MatcherAssert.assertThat;
2323
import static org.hamcrest.Matchers.equalTo;
24+
import static org.junit.jupiter.api.Assertions.fail;
2425

2526
import com.zaxxer.hikari.HikariConfig;
2627
import com.zaxxer.hikari.HikariDataSource;
@@ -110,7 +111,7 @@ public static DataSource getMysqlDatasource(Void unused) {
110111
return new HikariDataSource(hikariConfig);
111112
}
112113

113-
private void monitorEssentialMetrics() {
114+
private void monitorEssentialMetrics() throws SQLException {
114115
DataSource ds = getMysqlDatasource(null);
115116
try {
116117
Connection conn = ds.getConnection();
@@ -128,6 +129,7 @@ private void monitorEssentialMetrics() {
128129
}
129130
} catch (SQLException ex) {
130131
LOG.error("SQL error in monitoring thread. Shutting down.", ex);
132+
throw (ex);
131133
} catch (InterruptedException ex) {
132134
LOG.info("Monitoring thread interrupted. Shutting down.");
133135
Thread.currentThread().interrupt();
@@ -207,7 +209,16 @@ public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException {
207209
return null;
208210
});
209211
Thread writeThread = new Thread(() -> writePipeline.run().waitUntilFinish());
210-
Thread monitorThread = new Thread(this::monitorEssentialMetrics);
212+
Thread monitorThread =
213+
new Thread(
214+
() -> {
215+
try {
216+
monitorEssentialMetrics();
217+
} catch (SQLException e) {
218+
e.printStackTrace();
219+
fail("Failed because of SQLException in monitorEssentialMetrics!");
220+
}
221+
});
211222
monitorThread.start();
212223
writeThread.start();
213224

sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.hamcrest.Matchers;
3434
import org.junit.ClassRule;
3535
import org.junit.Test;
36+
import org.junit.rules.TemporaryFolder;
3637
import org.junit.runner.RunWith;
3738
import org.junit.runners.Parameterized;
3839
import org.testcontainers.containers.Container;
@@ -44,6 +45,8 @@
4445
@RunWith(Parameterized.class)
4546
public class DebeziumReadSchemaTransformTest {
4647

48+
@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
49+
4750
@ClassRule
4851
public static final PostgreSQLContainer<?> POSTGRES_SQL_CONTAINER =
4952
new PostgreSQLContainer<>(
@@ -109,7 +112,9 @@ private PTransform<PCollectionRowTuple, PCollectionRowTuple> makePtransform(
109112
Lists.newArrayList(
110113
"database.server.id=579676",
111114
"schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory",
112-
"schema.history.internal.file.filename=data/schema_history.dat"))
115+
String.format(
116+
"schema.history.internal.file.filename=%s",
117+
tempFolder.getRoot().toPath().resolve("schema_history.dat"))))
113118
.build());
114119
}
115120

0 commit comments

Comments
 (0)