Skip to content

Commit

Permalink
Use PAGEFILE format for hive unsupported type
Browse files Browse the repository at this point in the history
  • Loading branch information
viczhang861 authored and wenleix committed Sep 18, 2020
1 parent 2d228db commit 28bd920
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ default TypeInfo translate(Type type)
}

/**
* When there is no mapping type in hive to translate {@code type},
* use provided {@code }defaultHiveType} when necessary. For example,
* explicit type is not needed for PageFile format.
* @param type source type to be translated
* @param defaultHiveType When there is no mapping type to translate a type to,
* use provided default hive type when necessary. For example,
* explicit type is not needed for PageFile format.
*/
TypeInfo translate(Type type, Optional<HiveType> defaultHiveType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
Expand Down Expand Up @@ -958,6 +959,14 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List<
}
});

if (isUsePageFileForHiveUnsupportedType(session)) {
if (!columns.stream()
.map(ColumnMetadata::getType)
.allMatch(HiveTypeTranslator::isSupportedHiveType)) {
storageFormat = PAGEFILE;
}
}

// PAGEFILE format doesn't require translation to hive type,
// choose HIVE_BINARY as a default hive type to make it compatible with Hive connector
Optional<HiveType> defaultHiveType = storageFormat == PAGEFILE ? Optional.of(HIVE_BINARY) : Optional.empty();
Expand All @@ -971,8 +980,10 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List<
ImmutableSet.of(),
typeTranslator,
defaultHiveType);

validateColumns(storageFormat, columnHandles);

HiveStorageFormat finalStorageFormat = storageFormat;
Table table = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
Expand All @@ -982,7 +993,7 @@ public ConnectorTableHandle createTemporaryTable(ConnectorSession session, List<
.map(handle -> new Column(handle.getName(), handle.getHiveType(), handle.getComment()))
.collect(toImmutableList()))
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(storageFormat))
.setStorageFormat(fromHiveStorageFormat(finalStorageFormat))
.setBucketProperty(bucketProperty)
.setLocation(""))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,62 @@ else if (varcharLength == VarcharType.UNBOUNDED_LENGTH) {
.collect(toList()));
}
return defaultHiveType
.orElseThrow(() -> new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type: %s", type)))
.orElseThrow(() -> new PrestoException(NOT_SUPPORTED, format("No default Hive type provided for unsupported Hive type: %s", type)))
.getTypeInfo();
}

public static boolean isSupportedHiveType(Type type)
{
if (BOOLEAN.equals(type)) {
return true;
}
if (BIGINT.equals(type)) {
return true;
}
if (INTEGER.equals(type)) {
return true;
}
if (SMALLINT.equals(type)) {
return true;
}
if (TINYINT.equals(type)) {
return true;
}
if (REAL.equals(type)) {
return true;
}
if (DOUBLE.equals(type)) {
return true;
}
if (type instanceof VarcharType) {
return true;
}
if (type instanceof CharType) {
return true;
}
if (VARBINARY.equals(type)) {
return true;
}
if (DATE.equals(type)) {
return true;
}
if (TIMESTAMP.equals(type)) {
return true;
}
if (type instanceof DecimalType) {
return true;
}
if (isArrayType(type)) {
return isSupportedHiveType(type.getTypeParameters().get(0));
}
if (isMapType(type)) {
return isSupportedHiveType(type.getTypeParameters().get(0)) &&
isSupportedHiveType(type.getTypeParameters().get(1));
}
if (isRowType(type)) {
return type.getTypeParameters().stream()
.allMatch(HiveTypeTranslator::isSupportedHiveType);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import static com.facebook.presto.SystemSessionProperties.ENABLE_STATS_COLLECTION_FOR_TEMPORARY_TABLE;
import static com.facebook.presto.hive.HiveQueryRunner.createMaterializingQueryRunner;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE;
import static com.facebook.presto.hive.TestHiveIntegrationSmokeTest.assertRemoteMaterializedExchangesCount;
import static com.facebook.presto.sql.tree.ExplainType.Type.LOGICAL;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.tpch.TpchTable.getTables;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

public class TestHiveDistributedQueriesWithExchangeMaterialization
extends AbstractTestDistributedQueries
Expand All @@ -42,9 +45,19 @@ public void testMaterializedExchangesEnabled()

@Test
public void testMaterializeHiveUnsupportedTypeForTemporaryTable()
{
testMaterializeHiveUnsupportedTypeForTemporaryTable(ORC, true);
testMaterializeHiveUnsupportedTypeForTemporaryTable(PAGEFILE, false);
assertThrows(RuntimeException.class, () -> testMaterializeHiveUnsupportedTypeForTemporaryTable(ORC, false));
}

private void testMaterializeHiveUnsupportedTypeForTemporaryTable(
HiveStorageFormat storageFormat,
boolean usePageFileForHiveUnsupportedType)
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "temporary_table_storage_format", "PAGEFILE")
.setCatalogSessionProperty("hive", "temporary_table_storage_format", storageFormat.name())
.setCatalogSessionProperty("hive", "use_pagefile_for_hive_unsupported_type", String.valueOf(usePageFileForHiveUnsupportedType))
.setSystemProperty(ENABLE_STATS_COLLECTION_FOR_TEMPORARY_TABLE, "true")
.build();

Expand Down Expand Up @@ -100,9 +113,19 @@ public void testMaterializeHiveUnsupportedTypeForTemporaryTable()

@Test
public void testBucketedByHiveUnsupportedTypeForTemporaryTable()
{
testBucketedByHiveUnsupportedTypeForTemporaryTable(ORC, true);
testBucketedByHiveUnsupportedTypeForTemporaryTable(PAGEFILE, false);
assertThrows(RuntimeException.class, () -> testBucketedByHiveUnsupportedTypeForTemporaryTable(ORC, false));
}

private void testBucketedByHiveUnsupportedTypeForTemporaryTable(
HiveStorageFormat storageFormat,
boolean usePageFileForHiveUnsupportedType)
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "temporary_table_storage_format", "PAGEFILE")
.setCatalogSessionProperty("hive", "temporary_table_storage_format", storageFormat.name())
.setCatalogSessionProperty("hive", "use_pagefile_for_hive_unsupported_type", String.valueOf(usePageFileForHiveUnsupportedType))
.setCatalogSessionProperty("hive", "bucket_function_type_for_exchange", "PRESTO_NATIVE")
.build();

Expand Down

0 comments on commit 28bd920

Please sign in to comment.