diff --git a/fe/fe-core/src/main/java/com/starrocks/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/com/starrocks/analysis/RoutineLoadDataSourceProperties.java index 764b8ee96c3d88..1fcf6c6d4867d6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/analysis/RoutineLoadDataSourceProperties.java +++ b/fe/fe-core/src/main/java/com/starrocks/analysis/RoutineLoadDataSourceProperties.java @@ -56,6 +56,7 @@ public class RoutineLoadDataSourceProperties implements ParseNode { private static final ImmutableSet CONFIGURABLE_KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) + .add(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL) .build(); private static final ImmutableSet CONFIGURABLE_PULSAR_PROPERTIES_SET = new ImmutableSet.Builder() @@ -77,6 +78,8 @@ public class RoutineLoadDataSourceProperties implements ParseNode { private List> pulsarPartitionInitialPositions = Lists.newArrayList(); @SerializedName(value = "customPulsarProperties") private Map customPulsarProperties = Maps.newHashMap(); + @SerializedName(value = "confluentSchemaRegistryUrl") + private String confluentSchemaRegistryUrl; private final NodePosition pos; @@ -113,6 +116,10 @@ public String getType() { return type; } + public String getConfluentSchemaRegistryUrl() { + return confluentSchemaRegistryUrl; + } + public List> getKafkaPartitionOffsets() { return kafkaPartitionOffsets; } @@ -151,7 +158,7 @@ private void checkDataSourceProperties() throws AnalysisException { private void checkKafkaProperties() throws AnalysisException { Optional optional = properties.keySet().stream().filter( entity -> !CONFIGURABLE_KAFKA_PROPERTIES_SET.contains(entity)).filter( - entity -> !entity.startsWith("property.")).findFirst(); + entity -> !entity.startsWith("property.") && !entity.startsWith("confluent.")).findFirst(); if (optional.isPresent()) { throw new AnalysisException(optional.get() + " is invalid kafka custom property"); } @@ -179,6 +186,8 @@ private void checkKafkaProperties() throws AnalysisException { // check custom properties CreateRoutineLoadStmt.analyzeKafkaCustomProperties(properties, customKafkaProperties); + + confluentSchemaRegistryUrl = properties.get(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL); } private void checkPulsarProperties() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java index a9bc70bb69b69f..2a188a485098c7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/KafkaRoutineLoadJob.java @@ -644,6 +644,8 @@ public void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourc convertCustomProperties(true); } + confluentSchemaRegistryUrl = dataSourceProperties.getConfluentSchemaRegistryUrl(); + LOG.info("modify the data source properties of kafka routine load job: {}, datasource properties: {}", this.id, dataSourceProperties); } diff --git a/fe/fe-core/src/test/java/com/starrocks/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/com/starrocks/analysis/AlterRoutineLoadStmtTest.java index 7c66f5d0002fe8..4b1741e883b63c 100644 --- a/fe/fe-core/src/test/java/com/starrocks/analysis/AlterRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/analysis/AlterRoutineLoadStmtTest.java @@ -107,7 +107,8 @@ public void testParser() { + "(\n" + "\"kafka_partitions\" = \"0, 1, 2\",\n" + "\"kafka_offsets\" = \"100, 200, 100\",\n" - + "\"property.group.id\" = \"group1\"\n" + + "\"property.group.id\" = \"group1\",\n" + + "\"confluent.schema.registry.url\" = \"https://key:passwrod@addr\"\n" + ");"; List stmts = com.starrocks.sql.parser.SqlParser.parse(sql, 32); AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt)stmts.get(0); @@ -122,6 +123,7 @@ public void testParser() { Assert.assertEquals(1, stmt.getDataSourceProperties().getCustomKafkaProperties().size()); Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("group.id")); Assert.assertEquals(3, stmt.getDataSourceProperties().getKafkaPartitionOffsets().size()); + Assert.assertEquals("https://key:passwrod@addr", stmt.getDataSourceProperties().getConfluentSchemaRegistryUrl()); } @Test