Skip to content

Commit

Permalink
DBZ-4395 Add connector specific Debezium Connect REST Extension/s and…
Browse files Browse the repository at this point in the history
… move logic from UI backend to the Debezium Connect REST Extension (part 1 of the re-architecturing/re-factoring)

closes to https://issues.redhat.com/browse/DBZ-4395
  • Loading branch information
rk3rn3r authored and jpechane committed Sep 21, 2023
1 parent e341910 commit 32161c9
Show file tree
Hide file tree
Showing 19 changed files with 777 additions and 247 deletions.
29 changes: 28 additions & 1 deletion debezium-connect-rest-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
</properties>

<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
Expand Down Expand Up @@ -119,7 +123,7 @@
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
Expand Down Expand Up @@ -153,6 +157,29 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<configuration>
<executable>tar</executable>
<workingDirectory>${project.build.directory}</workingDirectory>
</configuration>
<executions>
<execution>
<id>extract-assembly</id>
<phase>pre-integration-test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<arguments>
<argument>-xf</argument>
<argument>${project.artifactId}-${project.version}.tar.gz</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

import java.lang.Runtime.Version;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -25,16 +28,13 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.connect.transforms.predicates.Predicate;

import io.debezium.kcrestextension.entities.TransformsInfo;
import io.debezium.kcrestextension.entities.PredicateDefinition;
import io.debezium.kcrestextension.entities.TransformDefinition;
import io.debezium.metadata.ConnectorDescriptor;

/**
* A JAX-RS Resource class defining endpoints that enable some advanced features
Expand All @@ -43,26 +43,30 @@
* + return if topic auto-creation is available and enabled
*
*/
@Path("/debezium")
@Path(DebeziumResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumResource {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumResource.class);

// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
public static final Duration REQUEST_TIMEOUT_MS = Duration.ofSeconds(90);
// Mutable for integration testing; otherwise, some tests would take at least REQUEST_TIMEOUT_MS
// to run
private static Duration requestTimeoutMs = REQUEST_TIMEOUT_MS;

private final List<TransformsInfo> transforms;
public static final String BASE_PATH = "/debezium";
public static final String CONNECTOR_PLUGINS_ENDPOINT = "/connector-plugins";
public static final String TRANSFORMS_ENDPOINT = "/transforms";
public static final String PREDICATES_ENDPOINT = "/predicates";
public static final String TOPIC_CREATION_ENDPOINT = "/topic-creation-enabled";

public static final Set<String> SUPPORTED_CONNECTORS = new HashSet<>(Arrays.asList(
"io.debezium.connector.mongodb.MongoDbConnector",
"io.debezium.connector.mysql.MySqlConnector",
"io.debezium.connector.oracle.OracleConnector",
"io.debezium.connector.postgresql.PostgresConnector",
"io.debezium.connector.sqlserver.SqlServerConnector"));

private final ConnectClusterState connectClusterState;
private Herder herder = null;
private final Boolean isTopicCreationEnabled;
private final Herder herder;
private final Map<String, ?> config;
private List<TransformDefinition> transforms = null;
private List<PredicateDefinition> predicates = null;
private List<ConnectorDescriptor> availableConnectorPlugins = null;

private static final Pattern VERSION_PATTERN = Pattern
.compile("([1-9][0-9]*(?:(?:\\.0)*\\.[1-9][0-9]*)*)(?:-([a-zA-Z0-9]+))?(?:(\\+)(0|[1-9][0-9]*)?)?(?:-([-a-zA-Z0-9.]+))?");
Expand All @@ -71,29 +75,9 @@ public class DebeziumResource {
@javax.ws.rs.core.Context
private ServletContext context;

public DebeziumResource(ConnectClusterState clusterState, Map<String, ?> config) {
Field herderField;
try {
herderField = ConnectClusterStateImpl.class.getDeclaredField("herder");
}
catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
herderField.setAccessible(true);
try {
this.herder = (Herder) herderField.get(clusterState);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
this.transforms = new ArrayList<>();
this.config = config;
this.isTopicCreationEnabled = isTopicCreationEnabled();
}

// For testing purposes only
public static void setRequestTimeout(long requestTimeoutMs) {
DebeziumResource.requestTimeoutMs = Duration.ofMillis(requestTimeoutMs);
public DebeziumResource(ConnectClusterState connectClusterState, Map<String, ?> config) {
this.connectClusterState = connectClusterState;
this.isTopicCreationEnabled = isTopicCreationEnabled(config);
}

public static Version parseVersion(String version) {
Expand All @@ -107,40 +91,50 @@ else if (m.lookingAt()) {
throw new IllegalArgumentException("Invalid version string: \"" + version + "\"");
}

public static void resetRequestTimeout() {
DebeziumResource.requestTimeoutMs = REQUEST_TIMEOUT_MS;
private static <T> void addConnectorPlugins(List<ConnectorDescriptor> connectorPlugins, Collection<PluginDesc<T>> plugins) {
plugins.stream()
.filter(p -> SUPPORTED_CONNECTORS.contains(p.pluginClass().getName()))
.forEach(p -> connectorPlugins.add(new ConnectorDescriptor(p.pluginClass().getName(), p.version())));
}

@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
return this.getTransforms();
private synchronized void initConnectorPlugins() {
if (null == this.availableConnectorPlugins || this.availableConnectorPlugins.isEmpty()) {
// TODO: improve once plugins are allowed to be added/removed during runtime by Kafka Connect, @see org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
final List<ConnectorDescriptor> connectorPlugins = new ArrayList<>();
Herder herder = getHerder();
addConnectorPlugins(connectorPlugins, herder.plugins().sinkConnectors());
addConnectorPlugins(connectorPlugins, herder.plugins().sourceConnectors());
this.availableConnectorPlugins = Collections.unmodifiableList(connectorPlugins);
}
}

private synchronized List<TransformsInfo> getTransforms() {
if (this.transforms.isEmpty()) {
for (PluginDesc<Transformation<?>> plugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
private synchronized void initTransformsAndPredicates() {
if (null == this.transforms || this.transforms.isEmpty()) {
final List<TransformDefinition> transformPlugins = new ArrayList<>();
final List<PredicateDefinition> predicatePlugins = new ArrayList<>();
Herder herder = getHerder();
for (PluginDesc<Transformation<?>> transformPlugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(transformPlugin.className())) {
for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
PredicateDefinition predicateDefinition = PredicateDefinition.fromPluginDesc(predicate);
if (null != predicateDefinition) {
predicatePlugins.add(predicateDefinition);
}
}
}
else {
this.transforms.add(new TransformsInfo(plugin));
TransformDefinition transformDefinition = TransformDefinition.fromPluginDesc(transformPlugin);
if (null != transformDefinition) {
transformPlugins.add(transformDefinition);
}
}
}
this.predicates = Collections.unmodifiableList(predicatePlugins);
this.transforms = Collections.unmodifiableList(transformPlugins);
}

return Collections.unmodifiableList(this.transforms);
}

@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}

private synchronized Boolean isTopicCreationEnabled() {
private synchronized Boolean isTopicCreationEnabled(Map<String, ?> config) {
Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion());
String topicCreationProperty = (String) config.get("topic.creation.enable");
if (null == topicCreationProperty) { // when config is not set, default to true
Expand All @@ -150,8 +144,60 @@ private synchronized Boolean isTopicCreationEnabled() {
&& Boolean.parseBoolean(topicCreationProperty);
}

private synchronized Herder getHerder() {
if (null == this.herder) {
Field herderField;
try {
herderField = this.connectClusterState.getClass().getDeclaredField("herder");
}
catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
herderField.setAccessible(true);
try {
this.herder = (Herder) herderField.get(this.connectClusterState);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return this.herder;
}

@GET
@Path(CONNECTOR_PLUGINS_ENDPOINT)
@Produces(MediaType.APPLICATION_JSON)
public List<ConnectorDescriptor> availableDebeziumConnectors() {
initConnectorPlugins();
return this.availableConnectorPlugins;
}

@GET
@Path(TRANSFORMS_ENDPOINT)
@Produces(MediaType.APPLICATION_JSON)
public List<TransformDefinition> listTransforms() {
initTransformsAndPredicates();
return this.transforms;
}

@GET
@Path(PREDICATES_ENDPOINT)
@Produces(MediaType.APPLICATION_JSON)
public List<PredicateDefinition> listPredicates() {
initTransformsAndPredicates();
return this.predicates;
}

@GET
@Path(TOPIC_CREATION_ENDPOINT)
@Produces(MediaType.APPLICATION_JSON)
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}

@GET
@Path("/version")
@Produces(MediaType.APPLICATION_JSON)
public String getDebeziumVersion() {
return Module.version();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,22 @@
import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* JSON model that describes a Single Message Transform (SMT) entry.
* Base class for JSON models that describes a Single Message Transform (SMT) entry or a Kafka Connect Predicate entry.
*/
public class TransformsInfo {
abstract class PluginDefinition {

private static final Logger LOGGER = LoggerFactory.getLogger(TransformsInfo.class);
protected final String className;
protected final Map<String, PropertyDescriptor> properties;

private final String className;
private final Map<String, PropertyDescriptor> properties;

@JsonCreator
public TransformsInfo(String className, ConfigDef config) {
PluginDefinition(String className, ConfigDef config) {
this.className = className;
this.properties = getConfigProperties(className, config);
}

@JsonCreator
public TransformsInfo(String className, Class<? extends Transformation<?>> transformationClass) {
this.className = className;
try {
LOGGER.info("Loading config for TRANSFORM: " + className + "...");
this.properties = getConfigProperties(transformationClass.getName(), transformationClass.newInstance().config());
}
catch (InstantiationException | IllegalAccessException e) {
LOGGER.error("Unable to load TRANSFORM: " + className
+ "\n\t Reason: " + e.toString());
throw new RuntimeException(e);
}
}

private static Map<String, PropertyDescriptor> getConfigProperties(String className, ConfigDef configDef) {
Map<String, PropertyDescriptor> configProperties = new HashMap<>();
configDef.configKeys().forEach((fieldName, configKey) -> {
Expand All @@ -60,15 +38,6 @@ private static Map<String, PropertyDescriptor> getConfigProperties(String classN
return configProperties;
}

public TransformsInfo(PluginDesc<Transformation<?>> transform) {
this(transform.className(), transform.pluginClass());
}

@JsonProperty("transform")
public String className() {
return this.className;
}

@JsonProperty
public Map<String, PropertyDescriptor> properties() {
return this.properties;
Expand All @@ -79,7 +48,7 @@ public boolean equals(Object o) {
return true;
}
else if (o != null && this.getClass() == o.getClass()) {
TransformsInfo that = (TransformsInfo) o;
PluginDefinition that = (PluginDefinition) o;
return Objects.equals(this.className, that.className)
&& Objects.equals(this.properties, that.properties);
}
Expand All @@ -93,7 +62,7 @@ public int hashCode() {
}

public String toString() {
return "ConnectorPluginInfo{" + "className='" + this.className + '\'' +
return "PluginDefinition{" + "className='" + this.className + '\'' +
", documentation='" + this.properties + '\'' +
'}';
}
Expand Down
Loading

0 comments on commit 32161c9

Please sign in to comment.