Skip to content

Commit

Permalink
[feat](catalog)Support Pre-Execution Authentication for HMS Type Iceb…
Browse files Browse the repository at this point in the history
…erg Catalog Operations. (apache#43445)

### What problem does this PR solve?

Support Pre-Execution Authentication for HMS Type Iceberg Catalog
Operations Summary
This PR introduces a new utility class, PreExecutionAuthenticator, which
is designed to ensure pre-execution authentication for HMS (Hive
Metastore) type operations on Iceberg catalogs. This is especially
useful in environments where secure access is required, such as
Kerberos-based Hadoop ecosystems. By integrating
PreExecutionAuthenticator, each relevant operation will undergo an
authentication step prior to execution, maintaining security compliance.

### Motivation
In environments utilizing an Iceberg catalog with an HMS backend, many
operations may require authentication to access secure data or perform
privileged tasks. Given that operations on HMS-type catalogs typically
run within a Hadoop environment secured by Kerberos, ensuring each
operation is executed within an authenticated context is essential.
Previously, there was no standardized mechanism to enforce pre-execution
authentication, which led to potential security gaps. This PR aims to
address this issue by introducing an extensible authentication utility.

### Key Changes
Addition of PreExecutionAuthenticator Utility Class

Provides a standard way to perform pre-execution authentication for
tasks. Leverages HadoopAuthenticator (when available) to execute tasks
within a privileged context using doAs. Supports execution with or
without authentication, enabling flexibility for both secure and
non-secure environments. Integration with Iceberg Catalog Operations

All relevant HMS-type catalog operations will now use
PreExecutionAuthenticator to perform pre-execution authentication.
Ensures that operations like createDb, dropDb, and other privileged
tasks are executed only after authentication. Extensible Design

PreExecutionAuthenticator is adaptable to other future authentication
methods, if needed, beyond Hadoop and Kerberos.
CallableToPrivilegedExceptionActionAdapter class allows any Callable
task to be executed within a PrivilegedExceptionAction, making it
versatile for various task types.


### Check List (For Author)

- Test <!-- At least one of them must be included. -->

    - [x] Manual test (add detailed scripts or steps below)
```
mysql> CREATE TABLE ha
    ->        (
    ->            vendor_id BIGINT,
    ->            trip_id BIGINT,
    ->            trip_distance FLOAT,
    ->            fare_amount DOUBLE,
    ->            store_and_fwd_flag STRING,
    ->            ts DATETIME
    ->        );
Query OK, 0 rows affected (2.08 sec)

mysql> show create table ha;
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                                                                                                                                                                                                              |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ha    | CREATE TABLE `ha` (
  `vendor_id` bigint NULL,
  `trip_id` bigint NULL,
  `trip_distance` float NULL,
  `fare_amount` double NULL,
  `store_and_fwd_flag` text NULL,
  `ts` datetimev2(6) NULL
) ENGINE=ICEBERG_EXTERNAL_TABLE
LOCATION 'xxxxx'
PROPERTIES (
  "doris.version" = "doris-2.1.6-rc04-67ee7f53e6",
  "write.parquet.compression-codec" = "zstd"
);

mysql>        INSERT INTO iceberg.ck_iceberg.ha
    ->        VALUES
    ->         (1, 1000371, 1.8, 15.32, 'N', '2024-01-01 9:15:23'),
    ->         (2, 1000372, 2.5, 22.15, 'N', '2024-01-02 12:10:11'),
    ->         (2, 1000373, 0.9, 9.01, 'N', '2024-01-01 3:25:15'),
    ->         (1, 1000374, 8.4, 42.13, 'Y', '2024-01-03 7:12:33');  
Query OK, 4 rows affected (5.10 sec)
{'status':'COMMITTED', 'txnId':'35030'}

mysql> select * from ha;
+-----------+---------+---------------+-------------+--------------------+----------------------------+
| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts                         |
+-----------+---------+---------------+-------------+--------------------+----------------------------+
|         1 | 1000371 |           1.8 |       15.32 | N                  | 2024-01-01 09:15:23.000000 |
|         2 | 1000372 |           2.5 |       22.15 | N                  | 2024-01-02 12:10:11.000000 |
|         2 | 1000373 |           0.9 |        9.01 | N                  | 2024-01-01 03:25:15.000000 |
|         1 | 1000374 |           8.4 |       42.13 | Y                  | 2024-01-03 07:12:33.000000 |
+-----------+---------+---------------+-------------+--------------------+----------------------------+
4 rows in set (1.20 sec)
```
  • Loading branch information
CalvinKirs authored Nov 18, 2024
1 parent f7bc15f commit 9b983ca
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;

/**
* PreExecutionAuthenticator is a utility class that ensures specified tasks
* are executed with necessary authentication, particularly useful for systems
* like Hadoop that require Kerberos-based pre-execution authentication.
*
* <p>If a HadoopAuthenticator is provided, this class will execute tasks
* within a privileged context using Hadoop's authentication mechanisms
* (such as Kerberos). Otherwise, it will execute tasks normally.
*/
public class PreExecutionAuthenticator {

private HadoopAuthenticator hadoopAuthenticator;

/**
* Default constructor for PreExecutionAuthenticator.
* This allows setting the HadoopAuthenticator at a later point if needed.
*/
public PreExecutionAuthenticator() {
}

/**
* Executes the specified task with necessary authentication.
* <p>If a HadoopAuthenticator is set, the task will be executed within a
* privileged context using the doAs method. If no authenticator is present,
* the task will be executed directly.
*
* @param task The task to execute, represented as a Callable
* @param <T> The type of the result returned by the task
* @return The result of the executed task
* @throws Exception If an exception occurs during task execution
*/
public <T> T execute(Callable<T> task) throws Exception {
if (hadoopAuthenticator != null) {
// Adapts Callable to PrivilegedExceptionAction for use with Hadoop authentication
PrivilegedExceptionAction<T> action = new CallableToPrivilegedExceptionActionAdapter<>(task);
return hadoopAuthenticator.doAs(action);
} else {
// Executes the task directly if no authentication is needed
return task.call();
}
}

/**
* Retrieves the current HadoopAuthenticator.
* <p>This allows checking if a HadoopAuthenticator is configured or
* changing it at runtime.
*
* @return The current HadoopAuthenticator instance, or null if none is set
*/
public HadoopAuthenticator getHadoopAuthenticator() {
return hadoopAuthenticator;
}

/**
* Sets the HadoopAuthenticator, enabling pre-execution authentication
* for tasks requiring privileged access.
*
* @param hadoopAuthenticator An instance of HadoopAuthenticator to be used
*/
public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) {
this.hadoopAuthenticator = hadoopAuthenticator;
}

/**
* Adapter class to convert a Callable into a PrivilegedExceptionAction.
* <p>This is necessary to run the task within a privileged context,
* particularly for Hadoop operations with Kerberos.
*
* @param <T> The type of result returned by the action
*/
public class CallableToPrivilegedExceptionActionAdapter<T> implements PrivilegedExceptionAction<T> {
private final Callable<T> callable;

/**
* Constructs an adapter that wraps a Callable into a PrivilegedExceptionAction.
*
* @param callable The Callable to be adapted
*/
public CallableToPrivilegedExceptionActionAdapter(Callable<T> callable) {
this.callable = callable;
}

/**
* Executes the wrapped Callable as a PrivilegedExceptionAction.
*
* @return The result of the callable's call method
* @throws Exception If an exception occurs during callable execution
*/
@Override
public T run() throws Exception {
return callable.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
Expand All @@ -42,6 +43,8 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected String icebergCatalogType;
protected Catalog catalog;

protected PreExecutionAuthenticator preExecutionAuthenticator;

public IcebergExternalCatalog(long catalogId, String name, String comment) {
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
}
Expand All @@ -51,6 +54,7 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {

@Override
protected void initLocalObjectsImpl() {
preExecutionAuthenticator = new PreExecutionAuthenticator();
initCatalog();
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;

Expand All @@ -35,6 +37,11 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M
protected void initCatalog() {
icebergCatalogType = ICEBERG_HMS;
catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration());
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.DorisTypeVisitor;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
Expand All @@ -53,11 +54,14 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
protected Catalog catalog;
protected IcebergExternalCatalog dorisCatalog;
protected SupportsNamespaces nsCatalog;
private PreExecutionAuthenticator preExecutionAuthenticator;

public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) {
this.dorisCatalog = dorisCatalog;
this.catalog = catalog;
nsCatalog = (SupportsNamespaces) catalog;
this.preExecutionAuthenticator = dorisCatalog.preExecutionAuthenticator;

}

public Catalog getCatalog() {
Expand All @@ -82,9 +86,13 @@ public boolean databaseExist(String dbName) {
}

public List<String> listDatabaseNames() {
return nsCatalog.listNamespaces().stream()
.map(e -> e.toString())
.collect(Collectors.toList());
try {
return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces().stream()
.map(Namespace::toString)
.collect(Collectors.toList()));
} catch (Exception e) {
throw new RuntimeException("Failed to list database names, error message is: " + e.getMessage());
}
}


Expand All @@ -96,6 +104,19 @@ public List<String> listTableNames(String dbName) {

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performCreateDb(stmt);
return null;

});
} catch (Exception e) {
throw new DdlException("Failed to create database: "
+ stmt.getFullDbName() + " ,error message is: " + e.getMessage());
}
}

private void performCreateDb(CreateDbStmt stmt) throws DdlException {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
Expand All @@ -110,14 +131,25 @@ public void createDb(CreateDbStmt stmt) throws DdlException {
String icebergCatalogType = dorisCatalog.getIcebergCatalogType();
if (!properties.isEmpty() && !IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
throw new DdlException(
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
"Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType);
}
nsCatalog.createNamespace(Namespace.of(dbName), properties);
dorisCatalog.onRefreshCache(true);
}

@Override
public void dropDb(DropDbStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
preformDropDb(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop database: " + stmt.getDbName() + " ,error message is: ", e);
}
}

private void preformDropDb(DropDbStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
if (!databaseExist(dbName)) {
if (stmt.isSetIfExists()) {
Expand All @@ -134,6 +166,15 @@ public void dropDb(DropDbStmt stmt) throws DdlException {

@Override
public boolean createTable(CreateTableStmt stmt) throws UserException {
try {
preExecutionAuthenticator.execute(() -> performCreateTable(stmt));
} catch (Exception e) {
throw new DdlException("Failed to create table: " + stmt.getTableName() + " ,error message is:", e);
}
return false;
}

public boolean performCreateTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
Expand Down Expand Up @@ -166,6 +207,17 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
try {
preExecutionAuthenticator.execute(() -> {
performDropTable(stmt);
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop table: " + stmt.getTableName() + " ,error message is:", e);
}
}

private void performDropTable(DropTableStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
Expand Down Expand Up @@ -194,4 +246,8 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
public void truncateTable(String dbName, String tblName, List<String> partitions) {
throw new UnsupportedOperationException("Truncate Iceberg table is not supported.");
}

public PreExecutionAuthenticator getPreExecutionAuthenticator() {
return preExecutionAuthenticator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,23 @@ public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContex
if (LOG.isDebugEnabled()) {
LOG.info("iceberg table {} insert table finished!", tableInfo);
}

//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
if (insertCtx.isPresent()) {
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
try {
ops.getPreExecutionAuthenticator().execute(() -> {
//create and start the iceberg transaction
TUpdateMode updateMode = TUpdateMode.APPEND;
if (insertCtx.isPresent()) {
updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite()
? TUpdateMode.OVERWRITE
: TUpdateMode.APPEND;
}
updateManifestAfterInsert(updateMode);
return null;
});
} catch (Exception e) {
LOG.warn("Failed to finish insert for iceberg table {}.", tableInfo, e);
throw new RuntimeException(e);
}
updateManifestAfterInsert(updateMode);

}

private void updateManifestAfterInsert(TUpdateMode updateMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private void createCatalog() throws IOException {
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize("df", props);
this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", "", Maps.newHashMap(), "");
externalCatalog.initLocalObjectsImpl();
new MockUp<IcebergHMSExternalCatalog>() {
@Mock
public Catalog getCatalog() {
Expand Down

0 comments on commit 9b983ca

Please sign in to comment.