Skip to content

Commit

Permalink
Merge pull request prestodb#2 from twitter-forks/appendNullsForMissin…
Browse files Browse the repository at this point in the history
…gParquetColumns

Append nulls for missing values in Parquet.
  • Loading branch information
saileshmittal committed Oct 1, 2015
2 parents 5e68e6b + bce8187 commit 2fef94e
Showing 1 changed file with 50 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public PrestoReadSupport(boolean useParquetColumnNames, List<HiveColumnHandle> c
converters.add(new ParquetPrimitiveColumnConverter(i));
}
else {
converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType), i));
converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType, i), i));
}
}
}
Expand Down Expand Up @@ -674,25 +674,25 @@ private abstract static class GroupedConverter
public abstract Block getBlock();
}

private static BlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType)
private static BlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex)
{
if (parquetType.isPrimitive()) {
return new ParquetPrimitiveConverter(prestoType);
return new ParquetPrimitiveConverter(prestoType, fieldIndex);
}

return createGroupConverter(prestoType, columnName, parquetType);
return createGroupConverter(prestoType, columnName, parquetType, fieldIndex);
}

private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType)
private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex)
{
GroupType groupType = parquetType.asGroupType();
switch (prestoType.getTypeSignature().getBase()) {
case ARRAY:
return new ParquetListConverter(prestoType, columnName, groupType);
return new ParquetListConverter(prestoType, columnName, groupType, fieldIndex);
case MAP:
return new ParquetMapConverter(prestoType, columnName, groupType);
return new ParquetMapConverter(prestoType, columnName, groupType, fieldIndex);
case ROW:
return new ParquetStructConverter(prestoType, columnName, groupType);
return new ParquetStructConverter(prestoType, columnName, groupType, fieldIndex);
default:
throw new IllegalArgumentException("Column " + columnName + " type " + parquetType.getOriginalType() + " not supported");
}
Expand All @@ -705,25 +705,27 @@ private static class ParquetStructConverter
private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768;

private final Type rowType;
private final int fieldIndex;

private final List<BlockConverter> converters;
private BlockBuilder builder;
private BlockBuilder nullBuilder; // used internally when builder is set to null
private BlockBuilder currentEntryBuilder;

public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType)
public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType, int fieldIndex)
{
checkArgument(ROW.equals(prestoType.getTypeSignature().getBase()));
List<Type> prestoTypeParameters = prestoType.getTypeParameters();
List<parquet.schema.Type> fieldTypes = entryType.getFields();
checkArgument(prestoTypeParameters.size() == fieldTypes.size());

this.rowType = prestoType;
this.fieldIndex = fieldIndex;

ImmutableList.Builder<BlockConverter> converters = ImmutableList.builder();
for (int i = 0; i < prestoTypeParameters.size(); i++) {
parquet.schema.Type fieldType = fieldTypes.get(i);
converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType));
converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i));
}
this.converters = converters.build();
}
Expand All @@ -750,6 +752,9 @@ public void start()
currentEntryBuilder = nullBuilder.beginBlockEntry();
}
else {
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
currentEntryBuilder = builder.beginBlockEntry();
}
for (BlockConverter converter : converters) {
Expand All @@ -763,6 +768,10 @@ public void end()
for (BlockConverter converter : converters) {
converter.afterValue();
}
while (currentEntryBuilder.getPositionCount() < converters.size()) {
currentEntryBuilder.appendNull();
}

if (builder == null) {
nullBuilder.closeEntry();
}
Expand Down Expand Up @@ -791,13 +800,14 @@ private static class ParquetListConverter
private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768;

private final Type arrayType;
private final int fieldIndex;

private final BlockConverter elementConverter;
private BlockBuilder builder;
private BlockBuilder nullBuilder; // used internally when builder is set to null
private BlockBuilder currentEntryBuilder;

public ParquetListConverter(Type prestoType, String columnName, GroupType listType)
public ParquetListConverter(Type prestoType, String columnName, GroupType listType, int fieldIndex)
{
checkArgument(listType.getFieldCount() == 1,
"Expected LIST column '%s' to only have one field, but has %s fields",
Expand All @@ -806,6 +816,7 @@ public ParquetListConverter(Type prestoType, String columnName, GroupType listTy
checkArgument(ARRAY.equals(prestoType.getTypeSignature().getBase()));

this.arrayType = prestoType;
this.fieldIndex = fieldIndex;

// The Parquet specification requires that the element value of a
// LIST type be wrapped in an inner repeated group, like so:
Expand All @@ -821,7 +832,7 @@ public ParquetListConverter(Type prestoType, String columnName, GroupType listTy
// documentation at http://git.io/vOpNz.
parquet.schema.Type elementType = listType.getType(0);
if (isElementType(elementType, listType.getName())) {
elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType);
elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType, 0);
}
else {
elementConverter = new ParquetListEntryConverter(prestoType.getTypeParameters().get(0), columnName, elementType.asGroupType());
Expand Down Expand Up @@ -875,6 +886,9 @@ public void start()
currentEntryBuilder = nullBuilder.beginBlockEntry();
}
else {
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
currentEntryBuilder = builder.beginBlockEntry();
}
elementConverter.beforeValue(currentEntryBuilder);
Expand Down Expand Up @@ -926,7 +940,7 @@ public ParquetListEntryConverter(Type prestoType, String columnName, GroupType e
columnName,
elementType.getFieldCount());

elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0));
elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0), 0);
}

@Override
Expand Down Expand Up @@ -969,20 +983,22 @@ private static class ParquetMapConverter
private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768;

private final Type mapType;
private final int fieldIndex;

private final ParquetMapEntryConverter entryConverter;
private BlockBuilder builder;
private BlockBuilder nullBuilder; // used internally when builder is set to null
private BlockBuilder currentEntryBuilder;

public ParquetMapConverter(Type type, String columnName, GroupType mapType)
public ParquetMapConverter(Type type, String columnName, GroupType mapType, int fieldIndex)
{
checkArgument(mapType.getFieldCount() == 1,
"Expected MAP column '%s' to only have one field, but has %s fields",
mapType.getName(),
mapType.getFieldCount());

this.mapType = type;
this.fieldIndex = fieldIndex;

parquet.schema.Type entryType = mapType.getFields().get(0);

Expand Down Expand Up @@ -1014,6 +1030,9 @@ public void start()
currentEntryBuilder = nullBuilder.beginBlockEntry();
}
else {
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
currentEntryBuilder = builder.beginBlockEntry();
}
entryConverter.beforeValue(currentEntryBuilder);
Expand Down Expand Up @@ -1084,8 +1103,8 @@ public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType en
columnName,
entryGroupType.getType(0));

keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0));
valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1));
keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0), 0);
valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1), 1);
}

@Override
Expand Down Expand Up @@ -1131,12 +1150,14 @@ private static class ParquetPrimitiveConverter
implements BlockConverter
{
private final Type type;
private final int fieldIndex;
private BlockBuilder builder;
private boolean wroteValue;

public ParquetPrimitiveConverter(Type type)
public ParquetPrimitiveConverter(Type type, int fieldIndex)
{
this.type = type;
this.fieldIndex = fieldIndex;
}

@Override
Expand All @@ -1149,11 +1170,13 @@ public void beforeValue(BlockBuilder builder)
@Override
public void afterValue()
{
if (wroteValue) {
return;
}
}

builder.appendNull();
private void addMissingValues()
{
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
}

@Override
Expand Down Expand Up @@ -1187,27 +1210,31 @@ public void addValueFromDictionary(int dictionaryId)
@Override
public void addBoolean(boolean value)
{
addMissingValues();
BOOLEAN.writeBoolean(builder, value);
wroteValue = true;
}

@Override
public void addDouble(double value)
{
addMissingValues();
DOUBLE.writeDouble(builder, value);
wroteValue = true;
}

@Override
public void addLong(long value)
{
addMissingValues();
BIGINT.writeLong(builder, value);
wroteValue = true;
}

@Override
public void addBinary(Binary value)
{
addMissingValues();
if (type == TIMESTAMP) {
builder.writeLong(ParquetTimestampUtils.getTimestampMillis(value)).closeEntry();
}
Expand All @@ -1220,13 +1247,15 @@ public void addBinary(Binary value)
@Override
public void addFloat(float value)
{
addMissingValues();
DOUBLE.writeDouble(builder, value);
wroteValue = true;
}

@Override
public void addInt(int value)
{
addMissingValues();
BIGINT.writeLong(builder, value);
wroteValue = true;
}
Expand Down

0 comments on commit 2fef94e

Please sign in to comment.