Skip to content

Commit

Permalink
[Enhancement] Support for alternation of confluent schema registry ad…
Browse files Browse the repository at this point in the history
…dr (StarRocks#21241)

This PR allows you to change the confluent schema registry address for a routine load. If you want to change this property, you can use this SQL:

ALTER ROUTINE LOAD FOR db1.label1
FROM Kafka
(
   "confluent.schema.registry.url"="https:key:password@registryserverAddr"
);
  • Loading branch information
zaorangyang authored Apr 12, 2023
1 parent fd611b2 commit daa7b03
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class RoutineLoadDataSourceProperties implements ParseNode {
private static final ImmutableSet<String> CONFIGURABLE_KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
.add(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL)
.build();

private static final ImmutableSet<String> CONFIGURABLE_PULSAR_PROPERTIES_SET = new ImmutableSet.Builder<String>()
Expand All @@ -77,6 +78,8 @@ public class RoutineLoadDataSourceProperties implements ParseNode {
private List<Pair<String, Long>> pulsarPartitionInitialPositions = Lists.newArrayList();
@SerializedName(value = "customPulsarProperties")
private Map<String, String> customPulsarProperties = Maps.newHashMap();
@SerializedName(value = "confluentSchemaRegistryUrl")
private String confluentSchemaRegistryUrl;

private final NodePosition pos;

Expand Down Expand Up @@ -113,6 +116,10 @@ public String getType() {
return type;
}

public String getConfluentSchemaRegistryUrl() {
return confluentSchemaRegistryUrl;
}

public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
return kafkaPartitionOffsets;
}
Expand Down Expand Up @@ -151,7 +158,7 @@ private void checkDataSourceProperties() throws AnalysisException {
private void checkKafkaProperties() throws AnalysisException {
Optional<String> optional = properties.keySet().stream().filter(
entity -> !CONFIGURABLE_KAFKA_PROPERTIES_SET.contains(entity)).filter(
entity -> !entity.startsWith("property.")).findFirst();
entity -> !entity.startsWith("property.") && !entity.startsWith("confluent.")).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid kafka custom property");
}
Expand Down Expand Up @@ -179,6 +186,8 @@ private void checkKafkaProperties() throws AnalysisException {

// check custom properties
CreateRoutineLoadStmt.analyzeKafkaCustomProperties(properties, customKafkaProperties);

confluentSchemaRegistryUrl = properties.get(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL);
}

private void checkPulsarProperties() throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ public void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourc
convertCustomProperties(true);
}

confluentSchemaRegistryUrl = dataSourceProperties.getConfluentSchemaRegistryUrl();

LOG.info("modify the data source properties of kafka routine load job: {}, datasource properties: {}",
this.id, dataSourceProperties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public void testParser() {
+ "(\n"
+ "\"kafka_partitions\" = \"0, 1, 2\",\n"
+ "\"kafka_offsets\" = \"100, 200, 100\",\n"
+ "\"property.group.id\" = \"group1\"\n"
+ "\"property.group.id\" = \"group1\",\n"
+ "\"confluent.schema.registry.url\" = \"https://key:passwrod@addr\"\n"
+ ");";
List<StatementBase> stmts = com.starrocks.sql.parser.SqlParser.parse(sql, 32);
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt)stmts.get(0);
Expand All @@ -122,6 +123,7 @@ public void testParser() {
Assert.assertEquals(1, stmt.getDataSourceProperties().getCustomKafkaProperties().size());
Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("group.id"));
Assert.assertEquals(3, stmt.getDataSourceProperties().getKafkaPartitionOffsets().size());
Assert.assertEquals("https://key:passwrod@addr", stmt.getDataSourceProperties().getConfluentSchemaRegistryUrl());
}

@Test
Expand Down

0 comments on commit daa7b03

Please sign in to comment.