Skip to content

Commit

Permalink
Add kafka source connection string validation in datastream update pa…
Browse files Browse the repository at this point in the history
…th (#825)

Add validation in datastream update path to reject request for incorrect source regex connection string.
  • Loading branch information
vmaheshw authored May 13, 2021
1 parent d411dbc commit f1111e5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,7 @@ public void initializeDatastream(Datastream stream, List<Datastream> allDatastre
.put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
}

// verify that the source regular expression can be compiled
KafkaConnectionString connectionString = KafkaConnectionString.valueOf(stream.getSource().getConnectionString());
try {
Pattern pattern = Pattern.compile(connectionString.getTopicName());
LOG.info("Successfully compiled topic name pattern {}", pattern);
} catch (PatternSyntaxException e) {
throw new DatastreamValidationException(
String.format("Regular expression in Datastream source connection string (%s) is ill-formatted.",
stream.getSource().getConnectionString()), e);
}
validateSourceConnectionString(stream);
}

@Override
Expand Down Expand Up @@ -250,6 +241,29 @@ public void handleDatastream(List<DatastreamGroup> datastreamGroups) {
LOG.info("handleDatastream: new datastream groups: {}", _partitionDiscoveryThreadMap.keySet());
}

@Override
public void validateUpdateDatastreams(List<Datastream> datastreams, List<Datastream> allDatastreams)
throws DatastreamValidationException {
super.validateUpdateDatastreams(datastreams, allDatastreams);
//validate connection string
for (Datastream datastream : datastreams) {
validateSourceConnectionString(datastream);
}
}

private void validateSourceConnectionString(Datastream stream) throws DatastreamValidationException {
// verify that the source regular expression can be compiled
KafkaConnectionString connectionString = KafkaConnectionString.valueOf(stream.getSource().getConnectionString());
try {
Pattern pattern = Pattern.compile(connectionString.getTopicName());
LOG.info("Successfully compiled topic name pattern {}", pattern);
} catch (PatternSyntaxException e) {
throw new DatastreamValidationException(
String.format("Regular expression in Datastream source connection string (%s) is ill-formatted.",
stream.getSource().getConnectionString()), e);
}
}

/**
* PartitionDiscoveryThread listens to Kafka partitions periodically using the _consumer.listTopic()
* to fetch the latest subscribed partitions for a given datastreamGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,20 @@ public void testValidateDatastreamUpdatePausedPartitions() throws Exception {
coordinator.stop();
}

@Test
public void testValidateDatastreamUpdateWithBadSource() throws DatastreamValidationException {
String sourceRegex = "*Event*";
StringMap metadata = new StringMap();
metadata.put(DatastreamMetadataConstants.REUSE_EXISTING_DESTINATION_KEY, Boolean.FALSE.toString());
Datastream ds =
KafkaMirrorMakerConnectorTestUtils.createDatastream("testInitializeDatastreamWithBadSource", _broker,
sourceRegex, metadata);
KafkaMirrorMakerConnector connector =
new KafkaMirrorMakerConnector("testInitializeDatastreamWithBadSource", getDefaultConfig(Optional.empty()),
"testCluster");
Assert.assertThrows(DatastreamValidationException.class, () -> connector.validateUpdateDatastreams(Collections.singletonList(ds), Collections.emptyList()));
}

@Test
public void testMirrorMakerConnectorBasics() {
String yummyTopic = "YummyPizza";
Expand Down

0 comments on commit f1111e5

Please sign in to comment.