Skip to content

Commit

Permalink
[BugFix] Banned hive full acid table
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <chendingchao1@126.com>
  • Loading branch information
Smith-Cruise committed Jan 17, 2024
1 parent 515a360 commit 2e701ca
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -46,7 +47,6 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.starrocks.connector.PartitionUtil.toHivePartitionName;
import static com.starrocks.connector.hive.HiveMetastoreApiConverter.toHiveCommonStats;
import static com.starrocks.connector.hive.HiveMetastoreApiConverter.toMetastoreApiPartition;
import static com.starrocks.connector.hive.HiveMetastoreApiConverter.toMetastoreApiTable;
import static com.starrocks.connector.hive.HiveMetastoreApiConverter.updateStatisticsParameters;
import static com.starrocks.connector.hive.HiveMetastoreApiConverter.validateHiveTableType;
Expand Down Expand Up @@ -115,6 +115,11 @@ public Table getTable(String dbName, String tableName) {

if (!HiveMetastoreApiConverter.isHudiTable(table.getSd().getInputFormat())) {
validateHiveTableType(table.getTableType());
if (AcidUtils.isFullAcidTable(table)) {
throw new StarRocksConnectorException(
String.format("%s.%s is a hive transactional table(full acid), sr didn't support it yet", dbName,
tableName));
}
if (table.getTableType().equalsIgnoreCase("VIRTUAL_VIEW")) {
return HiveMetastoreApiConverter.toHiveView(table, catalogName);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ public void testGetTable() {
Assert.assertEquals("hive_catalog", hiveTable.getCatalogName());
}

@Test
public void testGetTransactionalTable() {
CachingHiveMetastore cachingHiveMetastore = new CachingHiveMetastore(
metastore, executor, expireAfterWriteSec, refreshAfterWriteSec, 1000, false);
// get insert only table
com.starrocks.catalog.Table table = cachingHiveMetastore.getTable("transactional_db", "insert_only");
Assert.assertNotNull(table);
// get full acid table
Assert.assertThrows(StarRocksConnectorException.class, () -> {
cachingHiveMetastore.getTable("transactional_db", "full_acid");
});
}

@Test
public void testTableExists() {
CachingHiveMetastore cachingHiveMetastore = new CachingHiveMetastore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -276,6 +277,13 @@ public org.apache.hadoop.hive.metastore.api.Database getDb(String dbName) {
}

public Table getTable(String dbName, String tblName) {
if (dbName.equalsIgnoreCase("transactional_db")) {
if (tblName.equalsIgnoreCase("insert_only")) {
return getTransactionalTable(dbName, tblName, true);
} else if (tblName.equalsIgnoreCase("full_acid")) {
return getTransactionalTable(dbName, tblName, false);
}
}
List<FieldSchema> partKeys = Lists.newArrayList(new FieldSchema("col1", "INT", ""));
List<FieldSchema> unPartKeys = Lists.newArrayList(new FieldSchema("col2", "INT", ""));
String hdfsPath = "hdfs://127.0.0.1:10000/hive";
Expand Down Expand Up @@ -303,6 +311,35 @@ public Table getTable(String dbName, String tblName) {
return msTable1;
}

private Table getTransactionalTable(String dbName, String tblName, boolean insertOnly) {
List<FieldSchema> unPartKeys = Lists.newArrayList(new FieldSchema("col2", "INT", ""));
String hdfsPath = "hdfs://127.0.0.1:10000/hive";
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(unPartKeys);
sd.setLocation(hdfsPath);
sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setParameters(ImmutableMap.of());
sd.setSerdeInfo(serDeInfo);
Table msTable1 = new Table();
msTable1.setDbName(dbName);
msTable1.setTableName(tblName);

msTable1.setSd(sd);
msTable1.setTableType("MANAGED_TABLE");
if (insertOnly) {
msTable1.setParameters(ImmutableMap.of(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true",
hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only"));
} else {
msTable1.setParameters(ImmutableMap.of(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true",
hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"));
}

msTable1.setPartitionKeys(new ArrayList<>());

return msTable1;
}

public boolean tableExists(String dbName, String tblName) {
return getTable(dbName, tblName) != null;
}
Expand Down

0 comments on commit 2e701ca

Please sign in to comment.