Skip to content

Add kerberos authentication support #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ Use the below schema to configure Splunk Connect for Kafka
"splunk.hec.json.event.formatted": "<true|false>",
"splunk.hec.ssl.trust.store.path": "<Java KeyStore location>",
"splunk.hec.ssl.trust.store.password": "<Java KeyStore password>"
"kerberos.user.principal": "<The Kerberos user principal the connector may use to authenticate with Kerberos>",
"kerberos.keytab.path": "<The path to the keytab file to use for authentication with Kerberos>"
}
}
```
Expand Down Expand Up @@ -198,6 +200,12 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.header.sourcetype` | This setting specifies the Kafka record header key which will determine the sourcetype value for the Splunk event. This setting is only applicable when `splunk.header.support` is set to `true`. | `splunk.header.sourcetype` |
| `splunk.header.host` | This setting specifies the Kafka record header key which will determine the host value for the Splunk event. This setting is only applicable when `splunk.header.support` is set to `true`. | `splunk.header.host` |

### Kerberos Parameters
| Name | Description | Default Value |
|-------- |----------------------------|-----------------------|
| `kerberos.user.principal` | The Kerberos user principal the connector may use to authenticate with Kerberos. | `""` |
| `kerberos.keytab.path` | The path to the keytab file to use for authentication with Kerberos. | `""` |

## Load balancing

See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment.
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.kafka.connect.errors.ConnectException;

/**
* Hec is the central class which will construct the HTTP Event Collector Client to send messages to Splunk.
Expand Down Expand Up @@ -196,7 +197,7 @@ public static HecAckPoller createPoller(HecConfig config, PollerCallback callbac
public Hec(HecConfig config, CloseableHttpClient httpClient, Poller poller, LoadBalancerInf loadBalancer) {
for (int i = 0; i < config.getTotalChannels(); ) {
for (String uri : config.getUris()) {
Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller);
Indexer indexer = new Indexer(uri, httpClient, poller, config);
indexer.setKeepAlive(config.getHttpKeepAlive());
indexer.setBackPressureThreshold(config.getBackoffThresholdSeconds());
loadBalancer.add(uri, indexer.getChannel().setTracking(config.getEnableChannelTracking()));
Expand Down Expand Up @@ -266,6 +267,14 @@ public final void close() {
public static CloseableHttpClient createHttpClient(final HecConfig config) {
int poolSizePerDest = config.getMaxHttpConnectionPerChannel();

if (config.kerberosAuthEnabled()) {
try {
return new HttpClientBuilder().buildKerberosClient();
} catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException ex) {
throw new ConnectException("Unable to build Kerberos Client", ex);
}
}

// Code block for default client construction
if(!config.getHasCustomTrustStore() &&
StringUtils.isBlank(config.getTrustStorePath()) &&
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public final class HecConfig {
private String trustStorePath;
private String trustStorePassword;
private int lbPollInterval = 120; // in seconds
private String kerberosPrincipal;
private String kerberosKeytabPath;

public HecConfig(List<String> uris, String token) {
this.uris = uris;
Expand Down Expand Up @@ -178,4 +180,26 @@ public HecConfig setBackoffThresholdSeconds(int backoffSeconds) {
backoffThresholdSeconds = backoffSeconds * 1000;
return this;
}

public String kerberosPrincipal() {
return kerberosPrincipal;
}

public HecConfig setKerberosPrincipal(String kerberosPrincipal) {
this.kerberosPrincipal = kerberosPrincipal;
return this;
}

public String kerberosKeytabLocation() {
return kerberosKeytabPath;
}

public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
this.kerberosKeytabPath = kerberosKeytabPath;
return this;
}

public boolean kerberosAuthEnabled() {
return !kerberosPrincipal().isEmpty();
}
}
45 changes: 45 additions & 0 deletions src/main/java/com/splunk/hecclient/HttpClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,24 @@
*/
package com.splunk.hecclient;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Principal;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;
Expand All @@ -29,6 +42,8 @@
import javax.net.ssl.SSLSession;
import java.security.cert.X509Certificate;



public final class HttpClientBuilder {
private int maxConnectionPoolSizePerDestination = 4;
private int maxConnectionPoolSize = 4 * 2;
Expand Down Expand Up @@ -87,6 +102,36 @@ public CloseableHttpClient build() {
.build();
}

public CloseableHttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
org.apache.http.impl.client.HttpClientBuilder builder =
org.apache.http.impl.client.HttpClientBuilder.create();
Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create().
register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build();
builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() {
@Override
public Principal getUserPrincipal() {
return null;
}
@Override
public String getPassword() {
return null;
}
});
builder.setDefaultCredentialsProvider(credentialsProvider);
SSLContextBuilder sslContextBuilderbuilder = new SSLContextBuilder();
sslContextBuilderbuilder.loadTrustMaterial(null, (chain, authType) -> true);
SSLConnectionSocketFactory sslsf = new
SSLConnectionSocketFactory(
sslContextBuilderbuilder.build(), NoopHostnameVerifier.INSTANCE);

builder.setSSLSocketFactory(sslsf);
CloseableHttpClient httpClient = builder.build();
return httpClient;
}


private SSLConnectionSocketFactory getSSLConnectionFactory() {
if (disableSSLCertVerification) {
return getUnsecureSSLConnectionSocketFactory();
Expand Down
88 changes: 81 additions & 7 deletions src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.splunk.kafka.connect.VersionUtils;
import com.sun.security.auth.module.Krb5LoginModule;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand All @@ -38,6 +50,8 @@ final class Indexer implements IndexerInf {
private static final Logger log = LoggerFactory.getLogger(Indexer.class);
private static final ObjectMapper jsonMapper = new ObjectMapper();

private HecConfig hecConfig;
private Configuration config;
private CloseableHttpClient httpClient;
private HttpContext context;
private String baseUrl;
Expand All @@ -51,10 +65,12 @@ final class Indexer implements IndexerInf {
private long backPressureThreshold = 60 * 1000; // 1 min

// Indexer doesn't own client, ack poller
public Indexer(String baseUrl, String hecToken, CloseableHttpClient client, Poller poller) {
public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig config) {
this.httpClient = client;
this.baseUrl = baseUrl;
this.hecToken = hecToken;
this.hecConfig = config;
this.hecToken = config.getToken();
this.poller = poller;
this.context = HttpClientContext.create();
backPressure = 0;
Expand Down Expand Up @@ -147,17 +163,73 @@ public boolean send(final EventBatch batch) {
@Override
public synchronized String executeHttpRequest(final HttpUriRequest req) {
CloseableHttpResponse resp;
try {
resp = httpClient.execute(req, context);
} catch (Exception ex) {
logBackPressure();
log.error("encountered io exception", ex);
throw new HecException("encountered exception when post data", ex);
if (hecConfig.kerberosAuthEnabled()) {
if (config == null) {
defineKerberosConfigs();
}
Set<Principal> principals = new HashSet<>(1);
principals.add(new KerberosPrincipal(hecConfig.kerberosPrincipal()));
Subject subject = new Subject(false, principals, new HashSet<>(), new HashSet<>());
try {
LoginContext lc = new LoginContext("SplunkSinkConnector", subject, null, config);
lc.login();
Subject serviceSubject = lc.getSubject();
resp = Subject.doAs(serviceSubject, (PrivilegedAction<CloseableHttpResponse>) () -> {
try {
return httpClient.execute(req, context);
} catch (IOException ex) {
logBackPressure();
throw new HecException("Encountered exception while posting data.", ex);
}
});
} catch (Exception le) {
throw new HecException(
"Encountered exception while authenticating via Kerberos.", le);
}
} else {
try {
resp = httpClient.execute(req, context);
} catch (Exception ex) {
logBackPressure();
throw new HecException("encountered exception when post data", ex);
}
}

return readAndCloseResponse(resp);
}


/**
* Creates the Kerberos configurations.
*
* @return map of kerberos configs
*/
private Map<String, Object> kerberosConfigMap() {
Map<String, Object> configs = new HashMap<>();
configs.put("useTicketCache", "true");
configs.put("renewTGT", "true");
configs.put("useKeyTab", "true");
configs.put("keyTab", hecConfig.kerberosKeytabLocation());
configs.put("refreshKrb5Config", "true");
configs.put("principal", hecConfig.kerberosPrincipal());
configs.put("storeKey", "false");
configs.put("doNotPrompt", "true");
return configs;
}

private void defineKerberosConfigs() {
config = new Configuration() {
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return new AppConfigurationEntry[]{
new AppConfigurationEntry(Krb5LoginModule.class.getName(),
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, kerberosConfigMap())
};
}
};
}


private String readAndCloseResponse(CloseableHttpResponse resp) {
String respPayload;
HttpEntity entity = resp.getEntity();
Expand All @@ -181,6 +253,8 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
poller.setStickySessionToTrue();
}



int status = resp.getStatusLine().getStatusCode();
// FIXME 503 server is busy backpressure
if (status != 200 && status != 201) {
Expand Down
47 changes: 46 additions & 1 deletion src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package com.splunk.kafka.connect;

import com.splunk.kafka.connect.VersionUtils;
import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONF;
import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_USER_PRINCIPAL_CONF;

import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

Expand All @@ -30,6 +37,8 @@
public final class SplunkSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
private Map<String, String> taskConfig;
private Map<String, ConfigValue> values;
private List<ConfigValue> validations;

@Override
public void start(Map<String, String> taskConfig) {
Expand Down Expand Up @@ -66,4 +75,40 @@ public String version() {
public ConfigDef config() {
return SplunkSinkConnectorConfig.conf();
}


@Override
public Config validate(final Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
validations = config.configValues();
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateKerberosConfigs(connectorConfigs);
return new Config(validations);
}

void validateKerberosConfigs(final Map<String, String> configs) {
final String keytab = configs.getOrDefault(KERBEROS_KEYTAB_PATH_CONF, "");
final String principal = configs.getOrDefault(KERBEROS_USER_PRINCIPAL_CONF, "");

if (StringUtils.isNotEmpty(keytab) && StringUtils.isNotEmpty(principal)) {
return;
}

if (keytab.isEmpty() && principal.isEmpty()) {
return;
}

String errorMessage = String.format(
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
KERBEROS_KEYTAB_PATH_CONF,
KERBEROS_USER_PRINCIPAL_CONF
);
addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage);
addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage);
}

private void addErrorMessage(String property, String error) {
values.get(property).addErrorMessage(error);
}
}
Loading