diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnector.java index bbbb0c519..3fcfad26e 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnector.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnector.java @@ -135,16 +135,7 @@ public void initializeDatastream(Datastream stream, List allDatastre .put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString()); } - // verify that the source regular expression can be compiled - KafkaConnectionString connectionString = KafkaConnectionString.valueOf(stream.getSource().getConnectionString()); - try { - Pattern pattern = Pattern.compile(connectionString.getTopicName()); - LOG.info("Successfully compiled topic name pattern {}", pattern); - } catch (PatternSyntaxException e) { - throw new DatastreamValidationException( - String.format("Regular expression in Datastream source connection string (%s) is ill-formatted.", - stream.getSource().getConnectionString()), e); - } + validateSourceConnectionString(stream); } @Override @@ -250,6 +241,29 @@ public void handleDatastream(List datastreamGroups) { LOG.info("handleDatastream: new datastream groups: {}", _partitionDiscoveryThreadMap.keySet()); } + @Override + public void validateUpdateDatastreams(List datastreams, List allDatastreams) + throws DatastreamValidationException { + super.validateUpdateDatastreams(datastreams, allDatastreams); + //validate connection string + for (Datastream datastream : datastreams) { + validateSourceConnectionString(datastream); + } + } + + private void validateSourceConnectionString(Datastream stream) throws DatastreamValidationException { + // verify that the source regular expression can be compiled + KafkaConnectionString connectionString = KafkaConnectionString.valueOf(stream.getSource().getConnectionString()); + try { + Pattern pattern = Pattern.compile(connectionString.getTopicName()); + LOG.info("Successfully compiled topic name pattern {}", pattern); + } catch (PatternSyntaxException e) { + throw new DatastreamValidationException( + String.format("Regular expression in Datastream source connection string (%s) is ill-formatted.", + stream.getSource().getConnectionString()), e); + } + } + /** * PartitionDiscoveryThread listens to Kafka partitions periodically using the _consumer.listTopic() * to fetch the latest subscribed partitions for a given datastreamGroup diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java index 3941093a9..3d55dc273 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnector.java @@ -254,6 +254,20 @@ public void testValidateDatastreamUpdatePausedPartitions() throws Exception { coordinator.stop(); } + @Test + public void testValidateDatastreamUpdateWithBadSource() throws DatastreamValidationException { + String sourceRegex = "*Event*"; + StringMap metadata = new StringMap(); + metadata.put(DatastreamMetadataConstants.REUSE_EXISTING_DESTINATION_KEY, Boolean.FALSE.toString()); + Datastream ds = + KafkaMirrorMakerConnectorTestUtils.createDatastream("testInitializeDatastreamWithBadSource", _broker, + sourceRegex, metadata); + KafkaMirrorMakerConnector connector = + new KafkaMirrorMakerConnector("testInitializeDatastreamWithBadSource", getDefaultConfig(Optional.empty()), + "testCluster"); + Assert.assertThrows(DatastreamValidationException.class, () -> connector.validateUpdateDatastreams(Collections.singletonList(ds), Collections.emptyList())); + } + @Test public void testMirrorMakerConnectorBasics() { String yummyTopic = "YummyPizza";