Skip to content

Commit 3c6c626

Browse files
authored
Kafka Connect: Add regex for property file match (#11303)
1 parent 17f1c4d commit 3c6c626

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,12 @@ public JsonConverter jsonConverter() {
406406
return jsonConverter;
407407
}
408408

409+
@VisibleForTesting
410+
static boolean checkClassName(String className) {
411+
return (className.matches(".*\\.ConnectDistributed.*")
412+
|| className.matches(".*\\.ConnectStandalone.*"));
413+
}
414+
409415
/**
410416
* This method attempts to load the Kafka Connect worker properties, which are not exposed to
411417
* connectors. It does this by parsing the Java command used to launch the worker, extracting the
@@ -422,9 +428,7 @@ private Map<String, String> loadWorkerProps() {
422428
String javaCmd = System.getProperty("sun.java.command");
423429
if (javaCmd != null && !javaCmd.isEmpty()) {
424430
List<String> args = Splitter.on(' ').splitToList(javaCmd);
425-
if (args.size() > 1
426-
&& (args.get(0).endsWith(".ConnectDistributed")
427-
|| args.get(0).endsWith(".ConnectStandalone"))) {
431+
if (args.size() > 1 && checkClassName(args.get(0))) {
428432
Properties result = new Properties();
429433
try (InputStream in = Files.newInputStream(Paths.get(args.get(1)))) {
430434
result.load(in);

kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConfigTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,26 @@ public void testStringToList() {
8888

8989
@Test
9090
public void testStringWithParensToList() {}
91+
92+
@Test
93+
public void testCheckClassName() {
94+
Boolean result =
95+
IcebergSinkConfig.checkClassName("org.apache.kafka.connect.cli.ConnectDistributed");
96+
assertThat(result).isTrue();
97+
98+
result = IcebergSinkConfig.checkClassName("org.apache.kafka.connect.cli.ConnectStandalone");
99+
assertThat(result).isTrue();
100+
101+
result = IcebergSinkConfig.checkClassName("some.other.package.ConnectDistributed");
102+
assertThat(result).isTrue();
103+
104+
result = IcebergSinkConfig.checkClassName("some.other.package.ConnectStandalone");
105+
assertThat(result).isTrue();
106+
107+
result = IcebergSinkConfig.checkClassName("some.package.ConnectDistributedWrapper");
108+
assertThat(result).isTrue();
109+
110+
result = IcebergSinkConfig.checkClassName("org.apache.kafka.clients.producer.KafkaProducer");
111+
assertThat(result).isFalse();
112+
}
91113
}

0 commit comments

Comments
 (0)