Skip to content

Commit

Permalink
Merge pull request #225 from SourceLabOrg/sp/clusterCustomOptions
Browse files Browse the repository at this point in the history
Add ability to set custom client options to cluster config
  • Loading branch information
Crim authored Jul 6, 2020
2 parents 9243333 + 791f3c7 commit 9f92a91
Show file tree
Hide file tree
Showing 30 changed files with 1,172 additions and 32 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.7.0 (UNRELEASED)
#### New Features
- [PR-225](https://github.com/SourceLabOrg/kafka-webview/pull/225)
- Adds the ability to set custom kafka client properties when defining a cluster.
- Adds a new debugging tool under `/configuration/cluster` to see the generated kafka client properties.

#### Internal Dependency Updates
- Updated Kafka Client library version from 2.0.1 to 2.2.2.

## 2.6.0 (06/21/2020)
- [ISSUE-144](https://github.com/SourceLabOrg/kafka-webview/issues/144) Make providing a TrustStore file when setting up a SSL enabled cluster optional. You might not want/need this option if your JVM is already configured to accept the SSL certificate served by the cluster, or if the cluster's certificate can be validated by a publically accessible CA.
- [PR-215](https://github.com/SourceLabOrg/kafka-webview/pull/215) Improve errors displayed when using the `test cluster` functionality.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.unboundid.ldap.listener.InMemoryDirectoryServerConfig;
import com.unboundid.ldap.listener.InMemoryListenerConfig;
import com.unboundid.ldap.sdk.LDAPException;
import com.unboundid.ldap.sdk.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
2 changes: 1 addition & 1 deletion kafka-webview-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<!-- Dependency versions -->
<avro.version>1.8.2</avro.version>
<bootstrap.version>4.0.0-beta</bootstrap.version>
<kafka.version>2.0.1</kafka.version>
<kafka.version>2.2.2</kafka.version>
<protobuf.version>3.6.1</protobuf.version>
<thymeleaf.version>3.0.11.RELEASE</thymeleaf.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.manager.SensitiveConfigScrubber;
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaAdminFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaClientConfigUtil;
Expand Down Expand Up @@ -202,4 +203,14 @@ public KafkaClientConfigUtil getKafkaClientConfigUtil(final AppProperties appPro
public SaslUtility getSaslUtility(final SecretManager secretManager) {
return new SaslUtility(secretManager);
}

/**
* For scrubbing sensitive values from client configs.
* @param saslUtility instance.
* @return instance.
*/
@Bean
public SensitiveConfigScrubber getSensitiveConfigScrubber(final SaslUtility saslUtility) {
return new SensitiveConfigScrubber(saslUtility);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import org.sourcelab.kafka.webview.ui.manager.ui.FlashMessage;
import org.sourcelab.kafka.webview.ui.manager.ui.datatable.Datatable;
import org.sourcelab.kafka.webview.ui.manager.ui.datatable.DatatableColumn;
import org.sourcelab.kafka.webview.ui.manager.ui.datatable.DatatableFilter;
import org.sourcelab.kafka.webview.ui.manager.ui.datatable.LinkTemplate;
import org.sourcelab.kafka.webview.ui.manager.ui.datatable.YesNoBadgeTemplate;
import org.sourcelab.kafka.webview.ui.model.Cluster;
import org.sourcelab.kafka.webview.ui.model.View;
import org.sourcelab.kafka.webview.ui.repository.ClusterRepository;
import org.sourcelab.kafka.webview.ui.repository.ViewRepository;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@

package org.sourcelab.kafka.webview.ui.controller.configuration.cluster;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.controller.BaseController;
import org.sourcelab.kafka.webview.ui.controller.configuration.cluster.forms.ClusterForm;
import org.sourcelab.kafka.webview.ui.manager.SensitiveConfigScrubber;
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaClientConfigUtil;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory;
import org.sourcelab.kafka.webview.ui.manager.plugin.UploadManager;
Expand All @@ -51,7 +55,11 @@

import javax.validation.Valid;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Controller for Cluster CRUD operations.
Expand All @@ -76,6 +84,9 @@ public class ClusterConfigController extends BaseController {
@Autowired
private SaslUtility saslUtility;

@Autowired
private SensitiveConfigScrubber sensitiveConfigScrubber;

/**
* GET Displays main configuration index.
*/
Expand All @@ -98,10 +109,16 @@ public String index(final Model model) {
public String createClusterForm(final ClusterForm clusterForm, final Model model) {
// Setup breadcrumbs
setupBreadCrumbs(model, "Create", "/configuration/cluster/create");
setupCreateForm(model);

return "configuration/cluster/create";
}

private void setupCreateForm(final Model model) {
// Load all available properties
model.addAttribute("kafkaSettings", KafkaClientConfigUtil.getAllKafkaConsumerProperties());
}

/**
* GET Displays edit cluster form.
*/
Expand All @@ -112,6 +129,9 @@ public String editClusterForm(
final RedirectAttributes redirectAttributes,
final Model model) {

// Initial setup
setupCreateForm(model);

// Retrieve by id
final Optional<Cluster> clusterOptional = clusterRepository.findById(id);
if (!clusterOptional.isPresent()) {
Expand Down Expand Up @@ -152,6 +172,23 @@ public String editClusterForm(
clusterForm.setSaslPassword(saslProperties.getPlainPassword());
clusterForm.setSaslCustomJaas(saslProperties.getJaas());

// Deserialize message parameters json string into a map
final ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> customOptions;
try {
customOptions = objectMapper.readValue(cluster.getOptionParameters(), Map.class);
} catch (final IOException e) {
// Fail safe?
customOptions = new HashMap<>();
}

// Update form object with properties.
for (final Map.Entry<String, String> entry : customOptions.entrySet()) {
clusterForm.getCustomOptionNames().add(entry.getKey());
clusterForm.getCustomOptionValues().add(entry.getValue());
}
clusterForm.setCustomOptionsEnabled(!customOptions.entrySet().isEmpty());

// Display template
return "configuration/cluster/create";
}
Expand All @@ -163,7 +200,11 @@ public String editClusterForm(
public String clusterUpdate(
@Valid final ClusterForm clusterForm,
final BindingResult bindingResult,
final RedirectAttributes redirectAttributes) {
final RedirectAttributes redirectAttributes,
final Model model) {

// Initial Setup.
setupCreateForm(model);

final boolean updateExisting = clusterForm.exists();

Expand Down Expand Up @@ -368,9 +409,13 @@ else if (!clusterForm.exists() || (clusterForm.getTrustStoreFile() != null && !c
cluster.setSaslConfig("");
}

// Handle custom options, convert into a JSON string.
final String jsonStr = handleCustomOptions(clusterForm);

// Update properties
cluster.setName(clusterForm.getName());
cluster.setBrokerHosts(clusterForm.getBrokerHosts());
cluster.setOptionParameters(jsonStr);
cluster.setValid(false);
clusterRepository.save(cluster);

Expand All @@ -382,6 +427,30 @@ else if (!clusterForm.exists() || (clusterForm.getTrustStoreFile() != null && !c
return "redirect:/configuration/cluster";
}

/**
* Handles getting custom defined options and values.
* @param clusterForm The submitted form.
*/
private String handleCustomOptions(final ClusterForm clusterForm) {
// If the checkbox is unselected, then just return "{}"
if (!clusterForm.getCustomOptionsEnabled()) {
return "{}";
}

// Build a map of Name => Value
final Map<String, String> mappedOptions = clusterForm.getCustomOptionsAsMap();

// For converting map to json string
final ObjectMapper objectMapper = new ObjectMapper();

try {
return objectMapper.writeValueAsString(mappedOptions);
} catch (final JsonProcessingException e) {
// Fail safe?
return "{}";
}
}

/**
* POST deletes the selected cluster.
*/
Expand Down Expand Up @@ -463,6 +532,47 @@ public String testCluster(@PathVariable final Long id, final RedirectAttributes
return "redirect:/configuration/cluster";
}

/**
* GET for getting client configuration.
*/
@RequestMapping(path = "/config/{id}", method = RequestMethod.GET)
public String getClientConfig(
@PathVariable final Long id,
final RedirectAttributes redirectAttributes,
final Model model
) {
// Retrieve it
final Optional<Cluster> clusterOptional = clusterRepository.findById(id);
if (!clusterOptional.isPresent()) {
// Set flash message & redirect
redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find cluster!"));

// redirect to cluster index
return "redirect:/configuration/cluster";
}
final Cluster cluster = clusterOptional.get();

// Setup breadcrumbs
setupBreadCrumbs(model, "Client Config: " + cluster.getName(), null);

// Generate configs with sensitive fields scrubbed.
final Map<String, Object> configs = sensitiveConfigScrubber.filterSensitiveOptions(
kafkaOperationsFactory.getConsumerConfig(cluster, getLoggedInUserId()),
cluster
)
// Sort by key for easier display.
.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> oldValue, LinkedHashMap::new));

// Render
model.addAttribute("configs", configs);
model.addAttribute("cluster", cluster);

// Render cluster config template
return "configuration/cluster/config";
}

private void setupBreadCrumbs(final Model model, final String name, final String url) {
// Setup breadcrumbs
final BreadCrumbManager manager = new BreadCrumbManager(model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Represents the form for creating/updating the Cluster entity.
Expand Down Expand Up @@ -91,6 +96,20 @@ public class ClusterForm {
*/
private String saslCustomJaas;

// Custom Client Properties
private Boolean customOptionsEnabled;

/**
* Names of custom options.
*/
private List<String> customOptionNames = new ArrayList<>();

/**
* Values of custom options.
*/
private List<String> customOptionValues = new ArrayList<>();


public Long getId() {
return id;
}
Expand Down Expand Up @@ -261,6 +280,66 @@ public boolean isPlainSaslMechanism() {
return "PLAIN".equals(saslMechanism);
}

/**
* Utility method to return custom options as a map.
*/
public Map<String, String> getCustomOptionsAsMap() {
// Build a map of Name => Value
final Map<String, String> mappedOptions = new HashMap<>();

final Iterator<String> names = getCustomOptionNames().iterator();
final Iterator<String> values = getCustomOptionValues().iterator();

while (names.hasNext()) {
final String name = names.next();
final String value;
if (values.hasNext()) {
value = values.next();
} else {
value = "";
}
mappedOptions.put(name, value);
}
return mappedOptions;
}

public List<String> getCustomOptionNames() {
return customOptionNames;
}

public void setCustomOptionNames(final List<String> customOptionNames) {
this.customOptionNames = customOptionNames;
}

public List<String> getCustomOptionValues() {
return customOptionValues;
}

public void setCustomOptionValues(final List<String> customOptionValues) {
this.customOptionValues = customOptionValues;
}

/**
* Enable/Disable flag for custom client options.
*/
public Boolean getCustomOptionsEnabled() {
if (customOptionsEnabled == null) {
return customOptionsEnabled = false;
}
return customOptionsEnabled;
}

/**
* Enable/Disable flag for custom client options.
*/
public void setCustomOptionsEnabled(final Boolean customOptionsEnabled) {
if (customOptionsEnabled == null) {
this.customOptionsEnabled = false;
} else {
this.customOptionsEnabled = customOptionsEnabled;
}
}

@Override
public String toString() {
return "ClusterForm{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.sourcelab.kafka.webview.ui.model.Cluster;
import org.sourcelab.kafka.webview.ui.model.View;
import org.sourcelab.kafka.webview.ui.repository.ClusterRepository;
import org.sourcelab.kafka.webview.ui.repository.MessageFormatRepository;
import org.sourcelab.kafka.webview.ui.repository.ViewRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Pageable;
Expand Down
Loading

0 comments on commit 9f92a91

Please sign in to comment.