Skip to content

Commit

Permalink
[Feature] Iceberg connector supports custom catalog (#8931)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdzs0612 authored Aug 1, 2022
1 parent 12c19a2 commit 54ff4b0
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 44 deletions.
16 changes: 15 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class IcebergTable extends Table {

public static final String ICEBERG_CATALOG = "starrocks.catalog-type";
public static final String ICEBERG_METASTORE_URIS = "iceberg.catalog.hive.metastore.uris";
private static final String ICEBERG_IMPL = "iceberg.catalog-impl";
public static final String ICEBERG_IMPL = "iceberg.catalog-impl";
public static final String ICEBERG_DB = "database";
public static final String ICEBERG_TABLE = "table";
public static final String ICEBERG_RESOURCE = "resource";
Expand All @@ -75,6 +75,10 @@ public IcebergTable(long id, String name, List<Column> schema, Map<String, Strin
setHiveCatalogProperties(properties, metastoreURI);
return;
}
if (null != properties.get(ICEBERG_IMPL)) {
setCustomCatalogProperties(properties);
return;
}
validate(properties);
}

Expand Down Expand Up @@ -139,6 +143,16 @@ private void setHiveCatalogProperties(Map<String, String> properties, String met
icebergProperties.put(ICEBERG_CATALOG, "HIVE_CATALOG");
}

private void setCustomCatalogProperties(Map<String, String> properties) {
db = properties.remove(ICEBERG_DB);
table = properties.remove(ICEBERG_TABLE);
icebergProperties.put(ICEBERG_CATALOG, "CUSTOM_CATALOG");
icebergProperties.put(ICEBERG_IMPL, properties.remove(ICEBERG_IMPL));
for (Map.Entry<String, String> entry : properties.entrySet()) {
icebergProperties.put(entry.getKey(), entry.getValue());
}
}

private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
throw new DdlException("Please set properties of iceberg table, they are: database, table.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

package com.starrocks.connector.iceberg;

import com.google.common.base.Preconditions;
import com.starrocks.common.DdlException;
import com.starrocks.common.util.Util;
import com.starrocks.connector.Connector;
import com.starrocks.connector.ConnectorContext;
import com.starrocks.connector.ConnectorMetadata;
Expand All @@ -13,33 +11,23 @@

import java.util.Map;

import static com.starrocks.catalog.IcebergTable.ICEBERG_METASTORE_URIS;

public class IcebergConnector implements Connector {
private static final Logger LOG = LogManager.getLogger(IcebergConnector.class);

private final Map<String, String> properties;
private final String catalogName;
private String metastoreURI;
private ConnectorMetadata metadata;

public IcebergConnector(ConnectorContext context) {
this.catalogName = context.getCatalogName();
this.properties = context.getProperties();
validate();
}

public void validate() {
this.metastoreURI = Preconditions.checkNotNull(properties.get(ICEBERG_METASTORE_URIS),
"%s must be set in properties when creating iceberg catalog", ICEBERG_METASTORE_URIS);
Util.validateMetastoreUris(metastoreURI);
}

@Override
public ConnectorMetadata getMetadata() throws DdlException {
if (metadata == null) {
try {
metadata = new IcebergMetadata(metastoreURI);
metadata = new IcebergMetadata(properties);
} catch (Exception e) {
LOG.error("Failed to create iceberg metadata on [catalog : {}]", catalogName, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.common.util.Util;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.external.iceberg.IcebergHiveCatalog;
import com.starrocks.external.iceberg.IcebergCatalog;
import com.starrocks.external.iceberg.IcebergCatalogType;
import com.starrocks.external.iceberg.IcebergUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -15,35 +17,52 @@
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static com.starrocks.catalog.IcebergTable.ICEBERG_CATALOG;
import static com.starrocks.catalog.IcebergTable.ICEBERG_IMPL;
import static com.starrocks.catalog.IcebergTable.ICEBERG_METASTORE_URIS;
import static com.starrocks.external.iceberg.IcebergUtil.getIcebergCustomCatalog;
import static com.starrocks.external.iceberg.IcebergUtil.getIcebergHiveCatalog;

public class IcebergMetadata implements ConnectorMetadata {

private static final Logger LOG = LogManager.getLogger(IcebergMetadata.class);
private final String metastoreURI;
private IcebergHiveCatalog hiveCatalog;
private String metastoreURI;
private String catalogType;
private String catalogImpl;
private IcebergCatalog icebergCatalog;
private Map<String, String> customProperties;

public IcebergMetadata(String metastoreURI) {
Map<String, String> properties = new HashMap<>();
// the first phase of IcebergConnector only supports hive catalog.
this.hiveCatalog = (IcebergHiveCatalog) getIcebergHiveCatalog(metastoreURI, properties);
this.metastoreURI = metastoreURI;
public IcebergMetadata(Map<String, String> properties) {
if (IcebergCatalogType.HIVE_CATALOG == IcebergCatalogType.fromString(properties.get(ICEBERG_CATALOG))) {
catalogType = properties.get(ICEBERG_CATALOG);
metastoreURI = properties.get(ICEBERG_METASTORE_URIS);
icebergCatalog = getIcebergHiveCatalog(metastoreURI, properties);
Util.validateMetastoreUris(metastoreURI);
} else if (IcebergCatalogType.CUSTOM_CATALOG == IcebergCatalogType.fromString(properties.get(ICEBERG_CATALOG))) {
catalogType = properties.get(ICEBERG_CATALOG);
catalogImpl = properties.get(ICEBERG_IMPL);
icebergCatalog = getIcebergCustomCatalog(catalogImpl, properties);
properties.remove(ICEBERG_CATALOG);
properties.remove(ICEBERG_IMPL);
customProperties = properties;
} else {
throw new RuntimeException(String.format("Property %s is missing or not supported now.", ICEBERG_CATALOG));
}
}

@Override
public List<String> listDbNames() throws DdlException {
return hiveCatalog.listAllDatabases();
return icebergCatalog.listAllDatabases();
}

@Override
public Database getDb(String dbName) {
try {
return hiveCatalog.getDB(dbName);
return icebergCatalog.getDB(dbName);
} catch (InterruptedException | TException e) {
LOG.error("Failed to get iceberg database " + dbName, e);
return null;
Expand All @@ -52,15 +71,19 @@ public Database getDb(String dbName) {

@Override
public List<String> listTableNames(String dbName) throws DdlException {
List<TableIdentifier> tableIdentifiers = hiveCatalog.listTables(Namespace.of(dbName));
List<TableIdentifier> tableIdentifiers = icebergCatalog.listTables(Namespace.of(dbName));
return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toCollection(ArrayList::new));
}

@Override
public Table getTable(String dbName, String tblName) {
try {
org.apache.iceberg.Table icebergTable = hiveCatalog.loadTable(IcebergUtil.getIcebergTableIdentifier(dbName, tblName));
return IcebergUtil.convertToSRTable(icebergTable, metastoreURI, dbName, tblName);
org.apache.iceberg.Table icebergTable
= icebergCatalog.loadTable(IcebergUtil.getIcebergTableIdentifier(dbName, tblName));
if (IcebergCatalogType.fromString(catalogType).equals(IcebergCatalogType.CUSTOM_CATALOG)) {
return IcebergUtil.convertCustomCatalogToSRTable(icebergTable, catalogImpl, dbName, tblName, customProperties);
}
return IcebergUtil.convertHiveCatalogToSRTable(icebergTable, metastoreURI, dbName, tblName);
} catch (DdlException e) {
LOG.error("Failed to get iceberg table " + IcebergUtil.getIcebergTableIdentifier(dbName, tblName), e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

package com.starrocks.external.iceberg;

import com.starrocks.catalog.Database;
import com.starrocks.catalog.IcebergTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.thrift.TException;

import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -39,4 +43,10 @@ public interface IcebergCatalog {
Table loadTable(TableIdentifier tableId,
String tableLocation,
Map<String, String> properties) throws StarRocksIcebergException;

List<String> listAllDatabases();

Database getDB(String dbName) throws InterruptedException, TException;

List<TableIdentifier> listTables(Namespace of);
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public List<String> listAllDatabases() {
try {
return new ArrayList<>(clients.run(IMetaStoreClient::getAllDatabases));
Expand All @@ -146,6 +147,7 @@ public List<String> listAllDatabases() {
}
}

@Override
public Database getDB(String dbName) throws InterruptedException, TException {
org.apache.hadoop.hive.metastore.api.Database db = clients.run(client -> client.getDatabase(dbName));
if (db == null || db.getName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,29 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p
return columns.build();
}

public static IcebergTable convertToSRTable(org.apache.iceberg.Table icebergTable, String metastoreURI,
public static IcebergTable convertCustomCatalogToSRTable(org.apache.iceberg.Table icebergTable, String catalogImpl,
String dbName, String tblName, Map<String, String> customProperties) throws DdlException {
Map<String, String> properties = new HashMap<>();
properties.put(IcebergTable.ICEBERG_DB, dbName);
properties.put(IcebergTable.ICEBERG_TABLE, tblName);
properties.put(IcebergTable.ICEBERG_CATALOG, "CUSTOM_CATALOG");
properties.put(IcebergTable.ICEBERG_IMPL, catalogImpl);
properties.putAll(customProperties);
return convertToSRTable(icebergTable, properties);
}

public static IcebergTable convertHiveCatalogToSRTable(org.apache.iceberg.Table icebergTable, String metastoreURI,
String dbName, String tblName) throws DdlException {
Map<String, String> properties = new HashMap<>();
properties.put(IcebergTable.ICEBERG_CATALOG, "HIVE_CATALOG");
properties.put(IcebergTable.ICEBERG_DB, dbName);
properties.put(IcebergTable.ICEBERG_TABLE, tblName);
properties.put(IcebergTable.ICEBERG_CATALOG, "HIVE_CATALOG");
properties.put(IcebergTable.ICEBERG_METASTORE_URIS, metastoreURI);
return convertToSRTable(icebergTable, properties);
}

private static IcebergTable convertToSRTable(org.apache.iceberg.Table icebergTable, Map<String, String> properties)
throws DdlException {
Map<String, Types.NestedField> icebergColumns = icebergTable.schema().columns().stream()
.collect(Collectors.toMap(Types.NestedField::name, field -> field));
List<Column> fullSchema = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.google.common.collect.Lists;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.external.hive.HiveMetaStoreThriftClient;
import com.starrocks.external.iceberg.IcebergHiveCatalog;
import com.starrocks.external.iceberg.IcebergUtil;
Expand All @@ -19,8 +18,12 @@
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.starrocks.catalog.IcebergTable.ICEBERG_CATALOG;
import static com.starrocks.catalog.IcebergTable.ICEBERG_METASTORE_URIS;
import static com.starrocks.catalog.Table.TableType.ICEBERG;

public class IcebergMetadataTest {
Expand All @@ -34,8 +37,11 @@ public void testListDatabaseNames(@Mocked HiveMetaStoreThriftClient metaStoreThr
}
};

Map<String, String> properties = new HashMap<>();
String metastoreUris = "thrift://127.0.0.1:9083";
IcebergMetadata metadata = new IcebergMetadata(metastoreUris);
properties.put(ICEBERG_METASTORE_URIS, metastoreUris);
properties.put(ICEBERG_CATALOG, "hive");
IcebergMetadata metadata = new IcebergMetadata(properties);
List<String> expectResult = Lists.newArrayList("db1", "db2");
Assert.assertEquals(expectResult, metadata.listDbNames());
}
Expand All @@ -52,8 +58,11 @@ public void testGetDB(@Mocked IcebergHiveCatalog icebergHiveCatalog) throws Exce
}
};

Map<String, String> properties = new HashMap<>();
String metastoreUris = "thrift://127.0.0.1:9083";
IcebergMetadata metadata = new IcebergMetadata(metastoreUris);
properties.put(ICEBERG_METASTORE_URIS, metastoreUris);
properties.put(ICEBERG_CATALOG, "hive");
IcebergMetadata metadata = new IcebergMetadata(properties);
Database expectResult = new Database(0, db);
Assert.assertEquals(expectResult, metadata.getDb(db));
}
Expand All @@ -73,15 +82,18 @@ public void testListTableNames(@Mocked IcebergHiveCatalog icebergHiveCatalog) th
}
};

Map<String, String> properties = new HashMap<>();
String metastoreUris = "thrift://127.0.0.1:9083";
IcebergMetadata metadata = new IcebergMetadata(metastoreUris);
properties.put(ICEBERG_METASTORE_URIS, metastoreUris);
properties.put(ICEBERG_CATALOG, "hive");
IcebergMetadata metadata = new IcebergMetadata(properties);
List<String> expectResult = Lists.newArrayList("tbl1", "tbl2");
Assert.assertEquals(expectResult, metadata.listTableNames(db1));
}

@Test
public void testGetTable(@Mocked IcebergHiveCatalog icebergHiveCatalog,
@Mocked HiveTableOperations hiveTableOperations) throws Exception {
@Mocked HiveTableOperations hiveTableOperations) {

new Expectations() {
{
Expand All @@ -91,24 +103,31 @@ public void testGetTable(@Mocked IcebergHiveCatalog icebergHiveCatalog,
}
};

String resourceName = "thrift://127.0.0.1:9083";
IcebergMetadata metadata = new IcebergMetadata(resourceName);
Map<String, String> properties = new HashMap<>();
String metastoreUris = "thrift://127.0.0.1:9083";
properties.put(ICEBERG_METASTORE_URIS, metastoreUris);
properties.put(ICEBERG_CATALOG, "hive");
IcebergMetadata metadata = new IcebergMetadata(properties);
Table expectResult = new Table(0, "tbl", ICEBERG, new ArrayList<>());
Assert.assertEquals(expectResult, metadata.getTable("db", "tbl"));
}

@Test
public void testNotExistTable(@Mocked IcebergHiveCatalog icebergHiveCatalog,
@Mocked HiveTableOperations hiveTableOperations) throws DdlException {
@Mocked HiveTableOperations hiveTableOperations) {
new Expectations() {
{
icebergHiveCatalog.loadTable(IcebergUtil.getIcebergTableIdentifier("db", "tbl"));
result = new BaseTable(hiveTableOperations, "tbl");
minTimes = 0;
}
};
String resourceName = "thrift://127.0.0.1:9083";
IcebergMetadata metadata = new IcebergMetadata(resourceName);

Map<String, String> properties = new HashMap<>();
String metastoreUris = "thrift://127.0.0.1:9083";
properties.put(ICEBERG_METASTORE_URIS, metastoreUris);
properties.put(ICEBERG_CATALOG, "hive");
IcebergMetadata metadata = new IcebergMetadata(properties);
Assert.assertNull(metadata.getTable("db", "tbl2").getName());
}
}
Loading

0 comments on commit 54ff4b0

Please sign in to comment.