Skip to content

Commit

Permalink
Certificate role based authorization in Apache Bookkeeper. (#2429)
Browse files Browse the repository at this point in the history
This feature allows a predefined set of services to be 'whitelisted' to be able to access bookkeeper based on their client certificates. _This feature is disabled by default._


**Motivation**
As BookKeeper and its supported services move to a cloud friendly service based architecture, it becomes of utmost importance to monitor and allow only certain qualified services to be able to access the data in BK.
We have TLS based authentication, however, any service with the rootCA can access Bookkeeper clusters which is not desirable.

**Changes**

To enable if, we have to set 2 configuration options in server config.

1. Set `bookieAuthProviderFactoryClass` config option to use BookieAuthZFactory
`bookieAuthProviderFactoryClass=org.apache.bookkeeper.tls.BookieAuthZFactory`

2. Set `authorizedRoles` to a comma separated list of roles present in client certificates' OU field.
`authorizedRoles=pulsar-broker-1,pulsar-broker-2`

Read further for details on how to implement these in your client certificates and how to wire it up.

So this feature can be broken down into two parts:

    Certificate and roles
    Server configuration for authorized roles

**Details:**
_Certificate and roles:_
Here is an example of how the SUBJECT field of a final certificate for Apache Pulsar running in the cloud would look like:

    CN=apache.bookkeeper.org
    O=apache-pulsar
    OU=0:pulsar-broker-role;1:cluster-1
    L=San Francisco
    S=CA
    C=US

This shows that this bookkeeper client certificate is owned by the apache pulsar service has the role ‘pulsar-broker-role’ for entities in ‘cluster-1’.
Only those services with pulsar-broker-role should be able to access it.
We can add more fields separated by commas to increase the upstream application clusters to be able to access this bookkeeper cluster.

For example: `OU=0:herddb-readonlyNode,herddb-readwriteNode;1:herddb-cluster2`

Such separation of access based on services is paramount to keeping this secure as many upstream users of BookKeeper are financial institutions, databases and other services.

_Server configuration for authorized roles_
Once we have a certificate whose SUBJECT field has the OU attribute with the roles we want to authorize, on the Bookie side, we need to specify which roles are allowed.
We make this happen by introducing a server configuration option called `authorizedRoles`.
Since we have only static options, this will be set in stone as long as the bookie booted up with it.
If in case we need to change the allowed roles, we’ll need to stop the bookie, update the configuration file and then restart the bookie.
We can have multiple roles which are authorized as the OU field can have multiple comma separated values for roles.

This is a redo of stale PR #2355 

Master Issue: #2354
  • Loading branch information
Ghatage authored Oct 8, 2020
1 parent 1ab7614 commit 3a8f4b4
Show file tree
Hide file tree
Showing 17 changed files with 578 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
// Perform local consistency check on bookie startup
protected static final String LOCAL_CONSISTENCY_CHECK_ON_STARTUP = "localConsistencyCheckOnStartup";

// Certificate role based authorization
protected static final String AUTHORIZED_ROLES = "authorizedRoles";

/**
* Construct a default configuration object.
*/
Expand Down Expand Up @@ -3382,4 +3385,23 @@ public ServerConfiguration setEntryLogPerLedgerCounterLimitsMultFactor(
public boolean isLocalConsistencyCheckOnStartup() {
return this.getBoolean(LOCAL_CONSISTENCY_CHECK_ON_STARTUP, false);
}

/**
* Get the authorized roles.
*
* @return String array of configured auth roles.
*/
public String[] getAuthorizedRoles() {
return getStringArray(AUTHORIZED_ROLES);
}

/**
* Set authorized roles.
*
* @return Configuration Object with roles set
*/
public ServerConfiguration setAuthorizedRoles(String roles) {
this.setProperty(AUTHORIZED_ROLES, roles);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ private boolean checkAuthPlugin(AuthMessage am, final Channel src) {
return true;
}

public boolean isAuthenticated() {
return authenticated;
}

static class AuthResponseCallbackLegacy implements AuthCallbacks.GenericCallback<AuthToken> {
final BookieProtocol.AuthRequest req;
final Channel channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,19 @@ public void operationComplete(Future<Channel> future) throws Exception {
AuthHandler.ServerSideHandler authHandler = c.pipeline()
.get(AuthHandler.ServerSideHandler.class);
authHandler.authProvider.onProtocolUpgrade();
if (future.isSuccess()) {

/*
* Success of the future doesn't guarantee success in authentication
* future.isSuccess() only checks if the result field is not null
*/
if (future.isSuccess() && authHandler.isAuthenticated()) {
LOG.info("Session is protected by: {}", sslHandler.engine().getSession().getCipherSuite());
} else {
LOG.error("TLS Handshake failure.", future.cause());
if (future.isSuccess()) {
LOG.error("TLS Handshake failed: Could not authenticate.");
} else {
LOG.error("TLS Handshake failure: {} ", future.cause());
}
BookkeeperProtocol.Response.Builder errResponse = BookkeeperProtocol.Response.newBuilder()
.setHeader(r.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EIO);
c.writeAndFlush(errResponse.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.tls;

import com.google.common.base.Strings;

import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Collection;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.auth.AuthCallbacks;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.apache.bookkeeper.auth.BookieAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieConnectionPeer;
import org.apache.bookkeeper.util.CertUtils;


/**
* Authorization factory class.
*/
@Slf4j
public class BookieAuthZFactory implements BookieAuthProvider.Factory {

public String[] allowedRoles;

@Override
public String getPluginName() {
return "BookieAuthZFactory";
}

@Override
public void init(ServerConfiguration conf) throws IOException {
// Read from config
allowedRoles = conf.getAuthorizedRoles();

if (allowedRoles == null || allowedRoles.length == 0) {
throw new RuntimeException("Configuration option \'bookieAuthProviderFactoryClass\' is set to"
+ " \'BookieAuthZFactory\' but no roles set for configuration field \'authorizedRoles\'.");
}

// If authorization is enabled and there are no roles, exit
for (String allowedRole : allowedRoles) {
if (Strings.isNullOrEmpty(allowedRole)) {
throw new RuntimeException("Configuration option \'bookieAuthProviderFactoryClass\' is set to"
+ " \'BookieAuthZFactory\' but no roles set for configuration field \'authorizedRoles\'.");
}
}
}

@Override
public BookieAuthProvider newProvider(BookieConnectionPeer addr,
final AuthCallbacks.GenericCallback<Void> completeCb) {
return new BookieAuthProvider() {

AuthCallbacks.GenericCallback<Void> completeCallback = completeCb;

@Override
public void onProtocolUpgrade() {

try {
boolean secureBookieSideChannel = addr.isSecure();
Collection<Object> certificates = addr.getProtocolPrincipals();
if (secureBookieSideChannel && !certificates.isEmpty()
&& certificates.iterator().next() instanceof X509Certificate) {
X509Certificate tempCert = (X509Certificate) certificates.iterator().next();
String[] certRole = CertUtils.getRolesFromOU(tempCert);
if (certRole == null || certRole.length == 0) {
log.error("AuthZ failed: No cert role in OU field of certificate. Must have a role from "
+ "allowedRoles list {} host: {}",
allowedRoles, addr.getRemoteAddr());
completeCallback.operationComplete(BKException.Code.UnauthorizedAccessException, null);
return;
}
boolean authorized = false;
for (String allowedRole : allowedRoles) {
if (certRole[0].equals(allowedRole)) {
authorized = true;
}
}
if (authorized) {
addr.setAuthorizedId(new BookKeeperPrincipal(certRole[0]));
completeCallback.operationComplete(BKException.Code.OK, null);
} else {
log.error("AuthZ failed: Cert role {} doesn't match allowedRoles list {}; host: {}",
certRole, allowedRoles, addr.getRemoteAddr());
completeCallback.operationComplete(BKException.Code.UnauthorizedAccessException, null);
}
} else {
if (!secureBookieSideChannel) {
log.error("AuthZ failed: Bookie side channel is not secured; host: {}",
addr.getRemoteAddr());
} else if (certificates.isEmpty()) {
log.error("AuthZ failed: Certificate missing; host: {}", addr.getRemoteAddr());
} else {
log.error("AuthZ failed: Certs are missing or not X509 type; host: {}",
addr.getRemoteAddr());
}
completeCallback.operationComplete(BKException.Code.UnauthorizedAccessException, null);
}
} catch (Exception e) {
log.error("AuthZ failed: Failed to parse certificate; host: {}, {}", addr.getRemoteAddr(), e);
completeCallback.operationComplete(BKException.Code.UnauthorizedAccessException, null);
}
}

@Override
public void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb) {
}
};
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.util;

import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.naming.InvalidNameException;
import javax.naming.ldap.LdapName;
import javax.naming.ldap.Rdn;

/**
* Certificate parsing utilities.
*/
public abstract class CertUtils {

// OU values
public static final String OU_ROLE_NAME_CODE = "0";
public static final String OU_CLUSTER_NAME_CODE = "1";

public static final String OU_VALUES_SEPARATOR = ";";
public static final String OU_CODE_SEPARATOR = ":";
public static final String OU_NAME_SEPARATOR = ",";

static final Pattern OU_VALUES_SPLITTER = Pattern.compile(OU_VALUES_SEPARATOR);
static final Pattern OU_GENERAL_NAME_REGEX = Pattern.compile("^([0-9]+)" + OU_CODE_SEPARATOR + "(.*)$");
static final Pattern OU_NAME_SPLITTER = Pattern.compile(OU_NAME_SEPARATOR);

private CertUtils() {
}

public static String getOUString(X509Certificate cert) throws IOException {
return getOUStringFromSubject(cert.getSubjectX500Principal().getName());
}

public static String getOUStringFromSubject(String subject) throws IOException {
try {
LdapName ldapDN = new LdapName(subject);
for (Rdn rdn : ldapDN.getRdns()) {
if ("OU".equalsIgnoreCase(rdn.getType())) {
return rdn.getValue().toString();
}
}
return null;
} catch (InvalidNameException ine) {
throw new IOException(ine);
}
}

public static Map<String, String> getOUMapFromOUString(String ou) throws IOException {
Map<String, String> ouMap = new HashMap<>();
if (ou != null) {
String[] ouParts = OU_VALUES_SPLITTER.split(ou);
for (String ouPart : ouParts) {
Matcher matcher = OU_GENERAL_NAME_REGEX.matcher(ouPart);
if (matcher.find() && matcher.groupCount() == 2) {
ouMap.put(matcher.group(1).trim(), matcher.group(2).trim());
}
}
}
return Collections.unmodifiableMap(ouMap);
}

public static Map<String, String> getOUMap(X509Certificate cert) throws IOException {
return getOUMapFromOUString(getOUString(cert));
}

public static String[] getRolesFromOU(X509Certificate cert) throws IOException {
return getRolesFromOUMap(getOUMap(cert));
}

public static String[] getRolesFromOUMap(Map<String, String> ouMap) throws IOException {
String roleNames = ouMap.get(OU_ROLE_NAME_CODE);
if (roleNames != null) {
String[] roleParts = OU_NAME_SPLITTER.split(roleNames);
if (roleParts.length > 0) {
List<String> roles = new ArrayList<>(roleParts.length);
for (String role : roleParts) {
roles.add(role.trim());
}
return roles.toArray(new String[roles.size()]);
}
}
return null;
}
}
Loading

0 comments on commit 3a8f4b4

Please sign in to comment.