Skip to content

Commit

Permalink
Remove usage of TypeSignature in Hive connector
Browse files Browse the repository at this point in the history
  • Loading branch information
martint committed Oct 16, 2019
1 parent 7ef8de4 commit 5217f63
Show file tree
Hide file tree
Showing 42 changed files with 243 additions and 201 deletions.
5 changes: 5 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>io.airlift</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
Expand Down Expand Up @@ -121,8 +120,7 @@ public GenericHiveRecordCursor(
long totalBytes,
Properties splitSchema,
List<HiveColumnHandle> columns,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager)
DateTimeZone hiveStorageTimeZone)
{
requireNonNull(path, "path is null");
requireNonNull(recordReader, "recordReader is null");
Expand Down Expand Up @@ -162,7 +160,7 @@ public GenericHiveRecordCursor(
HiveColumnHandle column = columns.get(i);
checkState(column.getColumnType() == REGULAR, "column type must be regular");

types[i] = typeManager.getType(column.getTypeSignature());
types[i] = column.getType();
hiveTypes[i] = column.getHiveType();

StructField field = rowInspector.getStructFieldRef(column.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public Optional<RecordCursor> createRecordCursor(
length,
schema,
columns,
hiveStorageTimeZone,
typeManager));
hiveStorageTimeZone));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.TypeManager;
import io.prestosql.spi.type.TypeSignature;
import io.prestosql.spi.type.Type;

import java.util.Objects;
import java.util.Optional;
Expand All @@ -30,6 +29,8 @@
import static io.prestosql.plugin.hive.HiveType.HIVE_LONG;
import static io.prestosql.plugin.hive.HiveType.HIVE_STRING;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class HiveColumnHandle
Expand All @@ -38,22 +39,22 @@ public class HiveColumnHandle
public static final int PATH_COLUMN_INDEX = -11;
public static final String PATH_COLUMN_NAME = "$path";
public static final HiveType PATH_HIVE_TYPE = HIVE_STRING;
public static final TypeSignature PATH_TYPE_SIGNATURE = PATH_HIVE_TYPE.getTypeSignature();
public static final Type PATH_TYPE = VARCHAR;

public static final int BUCKET_COLUMN_INDEX = -12;
public static final String BUCKET_COLUMN_NAME = "$bucket";
public static final HiveType BUCKET_HIVE_TYPE = HIVE_INT;
public static final TypeSignature BUCKET_TYPE_SIGNATURE = BUCKET_HIVE_TYPE.getTypeSignature();
public static final Type BUCKET_TYPE_SIGNATURE = INTEGER;

public static final int FILE_SIZE_COLUMN_INDEX = -13;
public static final String FILE_SIZE_COLUMN_NAME = "$file_size";
public static final HiveType FILE_SIZE_TYPE = HIVE_LONG;
public static final TypeSignature FILE_SIZE_TYPE_SIGNATURE = FILE_SIZE_TYPE.getTypeSignature();
public static final Type FILE_SIZE_TYPE_SIGNATURE = BIGINT;

public static final int FILE_MODIFIED_TIME_COLUMN_INDEX = -14;
public static final String FILE_MODIFIED_TIME_COLUMN_NAME = "$file_modified_time";
public static final HiveType FILE_MODIFIED_TIME_TYPE = HIVE_LONG;
public static final TypeSignature FILE_MODIFIED_TIME_TYPE_SIGNATURE = FILE_SIZE_TYPE.getTypeSignature();
public static final Type FILE_MODIFIED_TIME_TYPE_SIGNATURE = BIGINT;

private static final String UPDATE_ROW_ID_COLUMN_NAME = "$shard_row_id";

Expand All @@ -66,7 +67,7 @@ public enum ColumnType

private final String name;
private final HiveType hiveType;
private final TypeSignature typeName;
private final Type type;
private final int hiveColumnIndex;
private final ColumnType columnType;
private final Optional<String> comment;
Expand All @@ -75,7 +76,7 @@ public enum ColumnType
public HiveColumnHandle(
@JsonProperty("name") String name,
@JsonProperty("hiveType") HiveType hiveType,
@JsonProperty("typeSignature") TypeSignature typeSignature,
@JsonProperty("type") Type type,
@JsonProperty("hiveColumnIndex") int hiveColumnIndex,
@JsonProperty("columnType") ColumnType columnType,
@JsonProperty("comment") Optional<String> comment)
Expand All @@ -84,7 +85,7 @@ public HiveColumnHandle(
checkArgument(hiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == SYNTHESIZED, "hiveColumnIndex is negative");
this.hiveColumnIndex = hiveColumnIndex;
this.hiveType = requireNonNull(hiveType, "hiveType is null");
this.typeName = requireNonNull(typeSignature, "type is null");
this.type = requireNonNull(type, "type is null");
this.columnType = requireNonNull(columnType, "columnType is null");
this.comment = requireNonNull(comment, "comment is null");
}
Expand Down Expand Up @@ -117,9 +118,9 @@ public boolean isHidden()
return columnType == SYNTHESIZED;
}

public ColumnMetadata getColumnMetadata(TypeManager typeManager)
public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(name, typeManager.getType(typeName), null, isHidden());
return new ColumnMetadata(name, type, null, isHidden());
}

@JsonProperty
Expand All @@ -129,9 +130,9 @@ public Optional<String> getComment()
}

@JsonProperty
public TypeSignature getTypeSignature()
public Type getType()
{
return typeName;
return type;
}

@JsonProperty
Expand Down Expand Up @@ -177,12 +178,12 @@ public static HiveColumnHandle updateRowIdHandle()
// plan-time support for row-by-row delete so that planning doesn't fail. This is why we need
// rowid handle. Note that in Hive connector, rowid handle is not implemented beyond plan-time.

return new HiveColumnHandle(UPDATE_ROW_ID_COLUMN_NAME, HIVE_LONG, BIGINT.getTypeSignature(), -1, SYNTHESIZED, Optional.empty());
return new HiveColumnHandle(UPDATE_ROW_ID_COLUMN_NAME, HIVE_LONG, BIGINT, -1, SYNTHESIZED, Optional.empty());
}

public static HiveColumnHandle pathColumnHandle()
{
return new HiveColumnHandle(PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE_SIGNATURE, PATH_COLUMN_INDEX, SYNTHESIZED, Optional.empty());
return new HiveColumnHandle(PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE, PATH_COLUMN_INDEX, SYNTHESIZED, Optional.empty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName
tableName.getSchemaName(),
tableName.getTableName(),
table.get().getParameters(),
getPartitionKeyColumnHandles(table.get()),
getHiveBucketHandle(table.get()));
getPartitionKeyColumnHandles(table.get(), typeManager),
getHiveBucketHandle(table.get(), typeManager));
}

@Override
Expand Down Expand Up @@ -418,14 +418,13 @@ private Optional<SystemTable> getPartitionsSystemTable(ConnectorSession session,
}

List<Type> partitionColumnTypes = partitionColumns.stream()
.map(HiveColumnHandle::getTypeSignature)
.map(typeManager::getType)
.map(HiveColumnHandle::getType)
.collect(toImmutableList());

List<ColumnMetadata> partitionSystemTableColumns = partitionColumns.stream()
.map(column -> new ColumnMetadata(
column.getName(),
typeManager.getType(column.getTypeSignature()),
column.getType(),
column.getComment().orElse(null),
column.isHidden()))
.collect(toImmutableList());
Expand Down Expand Up @@ -482,9 +481,9 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche
throw new TableNotFoundException(tableName);
}

Function<HiveColumnHandle, ColumnMetadata> metadataGetter = columnMetadataGetter(table.get(), typeManager);
Function<HiveColumnHandle, ColumnMetadata> metadataGetter = columnMetadataGetter(table.get());
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) {
for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get(), typeManager)) {
columns.add(metadataGetter.apply(columnHandle));
}

Expand Down Expand Up @@ -620,7 +619,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName();
Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
return hiveColumnHandles(table).stream()
return hiveColumnHandles(table, typeManager).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));
}

Expand Down Expand Up @@ -684,7 +683,7 @@ private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePr
@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
return ((HiveColumnHandle) columnHandle).getColumnMetadata(typeManager);
return ((HiveColumnHandle) columnHandle).getColumnMetadata();
}

@Override
Expand Down Expand Up @@ -1087,7 +1086,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
List<String> partitionColumnNames = partitionColumns.stream()
.map(Column::getName)
.collect(toImmutableList());
List<HiveColumnHandle> hiveColumnHandles = hiveColumnHandles(table);
List<HiveColumnHandle> hiveColumnHandles = hiveColumnHandles(table, typeManager);
Map<String, Type> columnTypes = hiveColumnHandles.stream()
.filter(columnHandle -> !columnHandle.isHidden())
.collect(toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager)));
Expand Down Expand Up @@ -1115,7 +1114,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
Map<String, Set<ColumnStatisticType>> columnStatisticTypes = hiveColumnHandles.stream()
.filter(columnHandle -> !partitionColumnNames.contains(columnHandle.getName()))
.filter(column -> !column.isHidden())
.collect(toImmutableMap(HiveColumnHandle::getName, column -> ImmutableSet.copyOf(metastore.getSupportedColumnStatistics(typeManager.getType(column.getTypeSignature())))));
.collect(toImmutableMap(HiveColumnHandle::getName, column -> ImmutableSet.copyOf(metastore.getSupportedColumnStatistics(column.getType()))));
Supplier<PartitionStatistics> emptyPartitionStatistics = Suppliers.memoize(() -> createEmptyPartitionStatistics(columnTypes, columnStatisticTypes));

int usedComputedStatistics = 0;
Expand Down Expand Up @@ -1389,7 +1388,7 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
}
}

List<HiveColumnHandle> handles = hiveColumnHandles(table).stream()
List<HiveColumnHandle> handles = hiveColumnHandles(table, typeManager).stream()
.filter(columnHandle -> !columnHandle.isHidden())
.collect(toList());

Expand Down Expand Up @@ -1958,7 +1957,7 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
}
}

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table, typeManager);
if (!hiveBucketHandle.isPresent()) {
return Optional.empty();
}
Expand Down Expand Up @@ -2206,7 +2205,7 @@ else if (column.isHidden()) {
columnHandles.add(new HiveColumnHandle(
column.getName(),
toHiveType(typeTranslator, column.getType()),
column.getType().getTypeSignature(),
column.getType(),
ordinal,
columnType,
Optional.ofNullable(column.getComment())));
Expand Down Expand Up @@ -2236,7 +2235,7 @@ private static void validateCsvColumns(ConnectorTableMetadata tableMetadata)
}
}

private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table, TypeManager typeManager)
private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table)
{
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
table.getPartitionColumns().stream().map(Column::getName).forEach(columnNames::add);
Expand Down Expand Up @@ -2269,7 +2268,7 @@ private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(T

return handle -> new ColumnMetadata(
handle.getName(),
typeManager.getType(handle.getTypeSignature()),
handle.getType(),
columnComment.get(handle.getName()).orElse(null),
columnExtraInfo(handle.isPartitionKey()),
handle.isHidden());
Expand Down
29 changes: 29 additions & 0 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.prestosql.plugin.hive;

import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
Expand All @@ -35,7 +37,11 @@
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeId;
import io.prestosql.spi.type.TypeManager;

import javax.inject.Inject;
import javax.inject.Singleton;

import java.util.concurrent.ExecutorService;
Expand All @@ -45,7 +51,9 @@
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand Down Expand Up @@ -117,6 +125,8 @@ public void configure(Binder binder)

configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
}

@ForHive
Expand All @@ -133,4 +143,23 @@ public Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> createMet
{
return transactionHandle -> ((HiveMetadata) transactionManager.get(transactionHandle)).getMetastore();
}

public static final class TypeDeserializer
extends FromStringDeserializer<Type>
{
private final TypeManager typeManager;

@Inject
public TypeDeserializer(TypeManager typeManager)
{
super(Type.class);
this.typeManager = requireNonNull(typeManager, "metadata is null");
}

@Override
protected Type _deserialize(String value, DeserializationContext context)
{
return typeManager.getType(TypeId.of(value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.prestosql.spi.connector.ConnectorPageSink;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;

Expand Down Expand Up @@ -93,7 +92,6 @@ public HivePageSink(
List<HiveColumnHandle> inputColumns,
Optional<HiveBucketProperty> bucketProperty,
PageIndexerFactory pageIndexerFactory,
TypeManager typeManager,
HdfsEnvironment hdfsEnvironment,
int maxOpenWriters,
ListeningExecutorService writeVerificationExecutor,
Expand All @@ -115,8 +113,7 @@ public HivePageSink(
this.pagePartitioner = new HiveWriterPagePartitioner(
inputColumns,
bucketProperty.isPresent(),
pageIndexerFactory,
typeManager);
pageIndexerFactory);

// determine the input index of the partition columns and data columns
// and determine the input index and type of bucketing columns
Expand Down Expand Up @@ -401,15 +398,14 @@ private static class HiveWriterPagePartitioner
public HiveWriterPagePartitioner(
List<HiveColumnHandle> inputColumns,
boolean bucketed,
PageIndexerFactory pageIndexerFactory,
TypeManager typeManager)
PageIndexerFactory pageIndexerFactory)
{
requireNonNull(inputColumns, "inputColumns is null");
requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");

List<Type> partitionColumnTypes = inputColumns.stream()
.filter(HiveColumnHandle::isPartitionKey)
.map(column -> typeManager.getType(column.getTypeSignature()))
.map(HiveColumnHandle::getType)
.collect(toList());

if (bucketed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
handle.getInputColumns(),
handle.getBucketProperty(),
pageIndexerFactory,
typeManager,
hdfsEnvironment,
maxOpenPartitions,
writeVerificationExecutor,
Expand Down
Loading

0 comments on commit 5217f63

Please sign in to comment.