Skip to content

Commit

Permalink
[BugFix] Fix an issue of modify confluent schema registry (#21552)
Browse files Browse the repository at this point in the history
PR #21241 supports modifying the schema registry address, but when modifying other parameters, FE fails to pass schema registry address parameters to BE, causing the task aborted. This PR fixes the problem.

Signed-off-by: Zaorang Yang <zaorangy@gmail.com>
(cherry picked from commit c9160c9)
  • Loading branch information
zaorangyang authored and wanpengfei-git committed Apr 13, 2023
1 parent 8e5f4ec commit 7061ce3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ private void checkKafkaProperties() throws AnalysisException {
// check custom properties
CreateRoutineLoadStmt.analyzeKafkaCustomProperties(properties, customKafkaProperties);

confluentSchemaRegistryUrl = properties.get(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL);
if (properties.containsKey(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL)) {
confluentSchemaRegistryUrl = properties.get(CreateRoutineLoadStmt.CONFLUENT_SCHEMA_REGISTRY_URL);
}
}

private void checkPulsarProperties() throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,9 @@ public void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourc
convertCustomProperties(true);
}

confluentSchemaRegistryUrl = dataSourceProperties.getConfluentSchemaRegistryUrl();
if (dataSourceProperties.getConfluentSchemaRegistryUrl() != null) {
confluentSchemaRegistryUrl = dataSourceProperties.getConfluentSchemaRegistryUrl();
}

LOG.info("modify the data source properties of kafka routine load job: {}, datasource properties: {}",
this.id, dataSourceProperties);
Expand Down

0 comments on commit 7061ce3

Please sign in to comment.