Skip to content

Commit

Permalink
KYLO-2866 - configure Sqoop controller services from Kylo Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
scottreisdorf committed Dec 6, 2018
1 parent 709cac2 commit 3d67a59
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.thinkbiganalytics.kylo.catalog.rest.model.Connector;
import com.thinkbiganalytics.kylo.catalog.rest.model.ConnectorPluginDescriptor;
import com.thinkbiganalytics.kylo.catalog.rest.model.ConnectorPluginNiFiControllerService;
import com.thinkbiganalytics.kylo.catalog.rest.model.ConnectorPluginNiFiControllerServicePropertyDescriptor;
import com.thinkbiganalytics.kylo.catalog.rest.model.DataSetTemplate;
import com.thinkbiganalytics.kylo.catalog.rest.model.DataSource;
import com.thinkbiganalytics.kylo.catalog.rest.model.DefaultDataSetTemplate;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.springframework.stereotype.Component;
import org.springframework.util.PropertyPlaceholderHelper;

import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -163,6 +163,7 @@ public DataSourceProvider(@Nonnull final DatasourceProvider feedDataSourceProvid
@Nonnull
public DataSource createDataSource(@Nonnull final DataSource source) {
return metadataService.commit(() -> {

// Find connector
final com.thinkbiganalytics.metadata.api.catalog.Connector connector = Optional.ofNullable(source.getConnector()).map(Connector::getId).map(connectorProvider::resolveId)
.flatMap(connectorProvider::find).orElseThrow(() -> new CatalogException("catalog.datasource.connector.invalid"));
Expand All @@ -174,7 +175,7 @@ public DataSource createDataSource(@Nonnull final DataSource source) {
final ConnectorPluginDescriptor plugin = pluginManager.getPlugin(connector.getPluginId()).map(ConnectorPlugin::getDescriptor).orElse(null);

if (plugin != null && plugin.getNifiControllerService() != null) {
createOrUpdateNiFiControllerService(source, plugin.getNifiControllerService());
createOrUpdateNiFiControllerService(source, plugin);
}

// Update catalog
Expand Down Expand Up @@ -279,7 +280,7 @@ public DataSource updateDataSource(@Nonnull final DataSource source) {
final ConnectorPluginDescriptor plugin = pluginManager.getPlugin(domain.getConnector().getPluginId()).map(ConnectorPlugin::getDescriptor).orElse(null);

if (plugin != null && plugin.getNifiControllerService() != null) {
createOrUpdateNiFiControllerService(source, plugin.getNifiControllerService());
createOrUpdateNiFiControllerService(source, plugin);
}

// Update catalog
Expand All @@ -294,13 +295,22 @@ public DataSource updateDataSource(@Nonnull final DataSource source) {
/**
* Creates or updates the NiFi controller service linked to the specified data source.
*/
private void createOrUpdateNiFiControllerService(@Nonnull final DataSource dataSource, @Nonnull final ConnectorPluginNiFiControllerService plugin) {
private void createOrUpdateNiFiControllerService(@Nonnull final DataSource dataSource, @Nonnull final ConnectorPluginDescriptor connectorPluginDescriptor) {

ConnectorPluginNiFiControllerService plugin = connectorPluginDescriptor.getNifiControllerService();
// Resolve properties
final PropertyPlaceholderHelper.PlaceholderResolver resolver = new DataSourcePlaceholderResolver(dataSource);
final Map<String, String> properties = new HashMap<>(plugin.getProperties().size());
final Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> propertyDescriptors = plugin.getPropertyDescriptors();

plugin.getProperties().forEach((key, value) -> {
final String resolvedValue = placeholderHelper.replacePlaceholders(value, resolver);
if (resolvedValue != null && !resolvedValue.startsWith("{cipher}")) {
//set empty string to null
ConnectorPluginNiFiControllerServicePropertyDescriptor descriptor = propertyDescriptors != null ? propertyDescriptors.get(key) : null;
if (StringUtils.isBlank(resolvedValue) && (descriptor == null || (descriptor != null && !descriptor.isEmptyStringIfNull()))) {
//set the value to null if its not explicitly configured to be set to an empty string
properties.put(key, null);
} else if (resolvedValue != null && !resolvedValue.startsWith("{cipher}")) {
properties.put(key, resolvedValue);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,111 @@
}
]
},
{
"title": "JDBC - Sqoop",
"pluginId": "jdbc-sqoop",
"format": "jdbc",
"tabs": [
{
"label": "Connection",
"sref": ".connection"
}
],
"options": [
{
"key": "url",
"label": "Connection URL",
"hint":"A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters. For example 'jdbc:mysql://localhost:3306/kylo' ",
"validators": [
{
"type": "url",
"params": {
"protocol": "jdbc:"
}
}
]
},
{
"key": "driver",
"label": "Driver Class Name",
"hint":"Database driver class name. For example 'org.mariadb.jdbc.Driver'. This is required for Kylo to make the catalog connection. It will not be used when Sqoop is invoked."
},
{
"key": "jars",
"label": "Driver Location(s)",
"hint":"Comma-separated list of paths containing the driver JAR and dependencies. Path must be accessible to Kylo, Nifi, and Spark. For example: 'file:///var/shared/mariadb-java-client-1.1.7.jar,s3a:///mybucket/db.jar'. This is required for Kylo to make the catalog connection. It will not be used when Sqoop is invoked.",
"required": false,
"validators": [
{
"type": "jars"
}
]
},
{
"key": "user",
"label": "User",
"hint":"Database user name"
},
{
"key": "password",
"type": "password",
"label": "Password",
"sensitive": true,
"required": false,
"hint":"The password for the database user either blank, if using password file, encrypted base64 or clear text, depending upon the 'password mode' setting "
},
{
"key": "passwordMode",
"label": "Password Mode",
"type":"select",
"selections": [{"label": "Encrypted on HDFS","value": "ENCRYPTED_ON_HDFS_FILE"},
{"label": "Clear Text", "value": "CLEAR_TEXT_ENTRY"},
{"label": "Encrypted text entry","value": "ENCRYPTED_TEXT_ENTRY"}
]
,"hint":"The type of password and how its provided"
},
{
"key": "sourcePasswordFile",
"label": "Source Password File",
"hint":"HDFS location containing the encrypted password",
"required": false
},
{
"key": "sourcePasswordPassphrase",
"label": "Source Password Passphrase",
"sensitive": true,
"hint":"decryption key",
"required": false
}
],
"nifiControllerService": {
"type": "com.thinkbiganalytics.nifi.v2.sqoop.StandardSqoopConnectionService",
"properties": {
"Source Connection String": "${url}",
"Source User Name": "${user}",
"Password Mode": "${passwordMode}",
"Source Password File": "${sourcePasswordFile}",
"Source Password Passphrase": "${sourcePasswordPassphrase}",
"Source Password (Encrypted Base64/Clear Text)": "${password}"
}
},
"nifiProperties": [
{
"processorTypes": ["com.thinkbiganalytics.nifi.GetTableData", "com.thinkbiganalytics.nifi.v2.ingest.GetTableData"],
"properties": {
"Source Database Connection": "{{ dataSource.nifiControllerServiceId }}",
"Source Table": "{{ options.dbname }}"
}
},
{
"processorTypes": ["com.thinkbiganalytics.nifi.v2.sqoop.core.ImportSqoop"],
"properties": {
"Source Connection Service": "{{ dataSource.nifiControllerServiceId }}",
"Source Table": "{{ options.dbname }}"
}
}
]
},
{
"title": "Local File System",
"pluginId": "local-file-system",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public class ConnectorPluginNiFiControllerService {
*/
private Map<String, String> properties;


/**
* Map of controller service property key to propertydescriptors
*/
private Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> propertyDescriptors;

/**
* Type of controller service
*/
Expand All @@ -45,9 +51,21 @@ public ConnectorPluginNiFiControllerService() {

public ConnectorPluginNiFiControllerService(@Nonnull final ConnectorPluginNiFiControllerService other) {
properties = (other.properties != null) ? new HashMap<>(other.properties) : null;
propertyDescriptors = copyDescriptorMap(other.propertyDescriptors);
type = other.type;
}

private Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> copyDescriptorMap(Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> map){
if(map != null){
Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> copy = new HashMap<>();
for(Map.Entry<String,ConnectorPluginNiFiControllerServicePropertyDescriptor> entry: map.entrySet()){
copy.put(entry.getKey(),new ConnectorPluginNiFiControllerServicePropertyDescriptor(entry.getValue()));
}
return copy;
}
return null;
}

public Map<String, String> getProperties() {
return properties;
}
Expand All @@ -63,4 +81,12 @@ public String getType() {
public void setType(String type) {
this.type = type;
}

public Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> getPropertyDescriptors() {
return propertyDescriptors;
}

public void setPropertyDescriptors(Map<String, ConnectorPluginNiFiControllerServicePropertyDescriptor> propertyDescriptors) {
this.propertyDescriptors = propertyDescriptors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.thinkbiganalytics.kylo.catalog.rest.model;

/*-
* #%L
* kylo-catalog-model
* %%
* Copyright (C) 2017 - 2018 ThinkBig Analytics, a Teradata Company
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import javax.annotation.Nonnull;

/**
* additional options for the nifi controller service properties
*/
public class ConnectorPluginNiFiControllerServicePropertyDescriptor {

private boolean emptyStringIfNull;


public ConnectorPluginNiFiControllerServicePropertyDescriptor(boolean emptyStringIfNull) {
this.emptyStringIfNull = emptyStringIfNull;
}

public ConnectorPluginNiFiControllerServicePropertyDescriptor(@Nonnull final ConnectorPluginNiFiControllerServicePropertyDescriptor other) {
emptyStringIfNull = other.emptyStringIfNull;
}

public boolean isEmptyStringIfNull() {
return emptyStringIfNull;
}

public void setEmptyStringIfNull(boolean emptyStringIfNull) {
this.emptyStringIfNull = emptyStringIfNull;
}
}

0 comments on commit 3d67a59

Please sign in to comment.