Skip to content

Commit

Permalink
Append nulls for missing values in Parquet.
Browse files Browse the repository at this point in the history
Parquet only calls converts for which it found the values. The missing
values are not reported. The BlockBuilder must be appended with
nulls for the missing values based on fieldIndex of the currently
read value by Parquet.
  • Loading branch information
Sailesh Mittal committed Oct 1, 2015
1 parent 5e3a182 commit bce8187
Showing 1 changed file with 50 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public PrestoReadSupport(boolean useParquetColumnNames, List<HiveColumnHandle> c
converters.add(new ParquetPrimitiveColumnConverter(i));
}
else {
converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType), i));
converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType, i), i));
}
}
}
Expand Down Expand Up @@ -674,25 +674,25 @@ private abstract static class GroupedConverter
public abstract Block getBlock();
}

private static BlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType)
private static BlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex)
{
if (parquetType.isPrimitive()) {
return new ParquetPrimitiveConverter(prestoType);
return new ParquetPrimitiveConverter(prestoType, fieldIndex);
}

return createGroupConverter(prestoType, columnName, parquetType);
return createGroupConverter(prestoType, columnName, parquetType, fieldIndex);
}

private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType)
private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex)
{
GroupType groupType = parquetType.asGroupType();
switch (prestoType.getTypeSignature().getBase()) {
case ARRAY:
return new ParquetListConverter(prestoType, columnName, groupType);
return new ParquetListConverter(prestoType, columnName, groupType, fieldIndex);
case MAP:
return new ParquetMapConverter(prestoType, columnName, groupType);
return new ParquetMapConverter(prestoType, columnName, groupType, fieldIndex);
case ROW:
return new ParquetStructConverter(prestoType, columnName, groupType);
return new ParquetStructConverter(prestoType, columnName, groupType, fieldIndex);
default:
throw new IllegalArgumentException("Column " + columnName + " type " + parquetType.getOriginalType() + " not supported");
}
Expand All @@ -705,25 +705,27 @@ private static class ParquetStructConverter
private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768;

private final Type rowType;
private final int fieldIndex;

private final List<BlockConverter> converters;
private BlockBuilder builder;
private BlockBuilder nullBuilder; // used internally when builder is set to null
private BlockBuilder currentEntryBuilder;

public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType)
public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType, int fieldIndex)
{
checkArgument(ROW.equals(prestoType.getTypeSignature().getBase()));
List<Type> prestoTypeParameters = prestoType.getTypeParameters();
List<parquet.schema.Type> fieldTypes = entryType.getFields();
checkArgument(prestoTypeParameters.size() == fieldTypes.size());

this.rowType = prestoType;
this.fieldIndex = fieldIndex;

ImmutableList.Builder<BlockConverter> converters = ImmutableList.builder();
for (int i = 0; i < prestoTypeParameters.size(); i++) {
parquet.schema.Type fieldType = fieldTypes.get(i);
converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType));
converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i));
}
this.converters = converters.build();
}
Expand All @@ -750,6 +752,9 @@ public void start()
currentEntryBuilder = nullBuilder.beginBlockEntry();
}
else {
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
currentEntryBuilder = builder.beginBlockEntry();
}
for (BlockConverter converter : converters) {
Expand All @@ -763,6 +768,10 @@ public void end()
for (BlockConverter converter : converters) {
converter.afterValue();
}
while (currentEntryBuilder.getPositionCount() < converters.size()) {
currentEntryBuilder.appendNull();
}

if (builder == null) {
nullBuilder.closeEntry();
}
Expand Down Expand Up @@ -791,13 +800,14 @@ private static class ParquetListConverter
private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768;

private final Type arrayType;
private final int fieldIndex;

private final BlockConverter elementConverter;
private BlockBuilder builder;
private BlockBuilder nullBuilder; // used internally when builder is set to null
private BlockBuilder currentEntryBuilder;

public ParquetListConverter(Type prestoType, String columnName, GroupType listType)
public ParquetListConverter(Type prestoType, String columnName, GroupType listType, int fieldIndex)
{
checkArgument(listType.getFieldCount() == 1,
"Expected LIST column '%s' to only have one field, but has %s fields",
Expand All @@ -806,6 +816,7 @@ public ParquetListConverter(Type prestoType, String columnName, GroupType listTy
checkArgument(ARRAY.equals(prestoType.getTypeSignature().getBase()));

this.arrayType = prestoType;
this.fieldIndex = fieldIndex;

// The Parquet specification requires that the element value of a
// LIST type be wrapped in an inner repeated group, like so:
Expand All @@ -821,7 +832,7 @@ public ParquetListConverter(Type prestoType, String columnName, GroupType listTy
// documentation at http://git.io/vOpNz.
parquet.schema.Type elementType = listType.getType(0);
if (isElementType(elementType, listType.getName())) {
elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType);
elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType, 0);
}
else {
elementConverter = new ParquetListEntryConverter(prestoType.getTypeParameters().get(0), columnName, elementType.asGroupType());
Expand Down Expand Up @@ -875,6 +886,9 @@ public void start()
currentEntryBuilder = nullBuilder.beginBlockEntry();
}
else {
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
currentEntryBuilder = builder.beginBlockEntry();
}
elementConverter.beforeValue(currentEntryBuilder);
Expand Down Expand Up @@ -926,7 +940,7 @@ public ParquetListEntryConverter(Type prestoType, String columnName, GroupType e
columnName,
elementType.getFieldCount());

elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0));
elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0), 0);
}

@Override
Expand Down Expand Up @@ -969,20 +983,22 @@ private static class ParquetMapConverter
private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768;

private final Type mapType;
private final int fieldIndex;

private final ParquetMapEntryConverter entryConverter;
private BlockBuilder builder;
private BlockBuilder nullBuilder; // used internally when builder is set to null
private BlockBuilder currentEntryBuilder;

public ParquetMapConverter(Type type, String columnName, GroupType mapType)
public ParquetMapConverter(Type type, String columnName, GroupType mapType, int fieldIndex)
{
checkArgument(mapType.getFieldCount() == 1,
"Expected MAP column '%s' to only have one field, but has %s fields",
mapType.getName(),
mapType.getFieldCount());

this.mapType = type;
this.fieldIndex = fieldIndex;

parquet.schema.Type entryType = mapType.getFields().get(0);

Expand Down Expand Up @@ -1014,6 +1030,9 @@ public void start()
currentEntryBuilder = nullBuilder.beginBlockEntry();
}
else {
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
currentEntryBuilder = builder.beginBlockEntry();
}
entryConverter.beforeValue(currentEntryBuilder);
Expand Down Expand Up @@ -1084,8 +1103,8 @@ public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType en
columnName,
entryGroupType.getType(0));

keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0));
valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1));
keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0), 0);
valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1), 1);
}

@Override
Expand Down Expand Up @@ -1131,12 +1150,14 @@ private static class ParquetPrimitiveConverter
implements BlockConverter
{
private final Type type;
private final int fieldIndex;
private BlockBuilder builder;
private boolean wroteValue;

public ParquetPrimitiveConverter(Type type)
public ParquetPrimitiveConverter(Type type, int fieldIndex)
{
this.type = type;
this.fieldIndex = fieldIndex;
}

@Override
Expand All @@ -1149,11 +1170,13 @@ public void beforeValue(BlockBuilder builder)
@Override
public void afterValue()
{
if (wroteValue) {
return;
}
}

builder.appendNull();
private void addMissingValues()
{
while (builder.getPositionCount() < fieldIndex) {
builder.appendNull();
}
}

@Override
Expand Down Expand Up @@ -1187,27 +1210,31 @@ public void addValueFromDictionary(int dictionaryId)
@Override
public void addBoolean(boolean value)
{
addMissingValues();
BOOLEAN.writeBoolean(builder, value);
wroteValue = true;
}

@Override
public void addDouble(double value)
{
addMissingValues();
DOUBLE.writeDouble(builder, value);
wroteValue = true;
}

@Override
public void addLong(long value)
{
addMissingValues();
BIGINT.writeLong(builder, value);
wroteValue = true;
}

@Override
public void addBinary(Binary value)
{
addMissingValues();
if (type == TIMESTAMP) {
builder.writeLong(ParquetTimestampUtils.getTimestampMillis(value)).closeEntry();
}
Expand All @@ -1220,13 +1247,15 @@ public void addBinary(Binary value)
@Override
public void addFloat(float value)
{
addMissingValues();
DOUBLE.writeDouble(builder, value);
wroteValue = true;
}

@Override
public void addInt(int value)
{
addMissingValues();
BIGINT.writeLong(builder, value);
wroteValue = true;
}
Expand Down

0 comments on commit bce8187

Please sign in to comment.