Skip to content

Commit

Permalink
PARQUET-251: Binary column statistics error when reuse byte[] among rows
Browse files Browse the repository at this point in the history
Author: asingh <asingh@cloudera.com>
Author: Alex Levenson <alexlevenson@twitter.com>
Author: Ashish Singh <asingh@cloudera.com>

Closes apache#197 from SinghAsDev/PARQUET-251 and squashes the following commits:

68e0eae [asingh] Remove deprecated constructors from private classes
67e4e5f [asingh] Add removed public methods in Binary and deprecate them
0e71728 [asingh] Add comment for BinaryStatistics.setMinMaxFromBytes
fbe873f [Ashish Singh] Merge pull request apache#4 from isnotinvain/PR-197-3
9826ee6 [Alex Levenson] Some minor cleanup
7570035 [asingh] Remove test for stats getting ingnored for version 160 when type is int64
af43d28 [Alex Levenson] Address PR feedback
89ab4ee [Alex Levenson] put the headers in the right location
2838cc9 [Alex Levenson] Split out version checks to separate files, add some tests
5af9142 [Alex Levenson] Generalize tests, make Binary.fromString reused=false
e00d9b7 [asingh] Rename isReused => isBackingBytesReused
d2ad939 [asingh] Rebase over latest trunk
857141a [asingh] Remove redundant junit dependency
32b88ed [asingh] Remove semver from hadoop-common
7a0e99e [asingh] Revert to fromConstantByteArray for ByteString
c820ec9 [asingh] Add unit tests for Binary and to check if stats are ignored for version 160
9bbd1e5 [asingh] Improve version parsing
84a1d8b [asingh] Remove ignoring stats on write side and ignore it on read side
903f8e3 [asingh] Address some review comments. * Ignore stats for writer's version < 1.8.0 * Refactor shoudlIgnoreStatistics method a bit * Assume implementations other than parquet-mr were writing binary   statistics correctly * Add toParquetStatistics method's original method signature to maintain   backwards compatibility and mark it as deprecated
64c2617 [asingh] Revert changes for ignoring stats at RowGroupFilter level
e861b18 [asingh] Ignore max min stats while reading
3a8cb8d [asingh] Fix typo
8e12618 [asingh] Fix usage of fromConstant versions of Binary constructors
860adf7 [asingh] Rename unmodified to constant and isReused instead of isUnmodifiable
0d127a7 [asingh] Add unmodfied and Reused versions for creating a Binary. Add copy() to Binary.
b4e2950 [asingh] Skip filtering based on stats when file was written with version older than 1.6.1
6fcee8c [asingh] Add getBytesUnsafe() to Binary that returns backing byte[] if possible, else returns result of getBytes()
30b07dd [asingh] PARQUET-251: Binary column statistics error when reuse byte[] among rows
  • Loading branch information
asingh authored and isnotinvain committed Jul 1, 2015
1 parent e6ee42e commit e3b9502
Show file tree
Hide file tree
Showing 42 changed files with 1,044 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ private void writeValue(Type type, Schema avroSchema, Object value) {
recordConsumer.addDouble(((Number) value).doubleValue());
} else if (avroType.equals(Schema.Type.BYTES)) {
if (value instanceof byte[]) {
recordConsumer.addBinary(Binary.fromByteArray((byte[]) value));
recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
} else {
recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
}
} else if (avroType.equals(Schema.Type.STRING)) {
recordConsumer.addBinary(fromAvroString(value));
Expand All @@ -269,14 +269,14 @@ private void writeValue(Type type, Schema avroSchema, Object value) {
} else if (avroType.equals(Schema.Type.UNION)) {
writeUnion(type.asGroupType(), nonNullAvroSchema, value);
} else if (avroType.equals(Schema.Type.FIXED)) {
recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
}
}

private Binary fromAvroString(Object value) {
if (value instanceof Utf8) {
Utf8 utf8 = (Utf8) value;
return Binary.fromByteArray(utf8.getBytes(), 0, utf8.getByteLength());
return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
}
return Binary.fromString(value.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ public void write(Map<String, Object> record) {
recordConsumer.endField("mydouble", index++);

recordConsumer.startField("mybytes", index);
recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
recordConsumer.addBinary(
Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes")));
recordConsumer.endField("mybytes", index++);

recordConsumer.startField("mystring", index);
Expand Down Expand Up @@ -457,7 +458,7 @@ public void write(Map<String, Object> record) {
recordConsumer.endField("mymap", index++);

recordConsumer.startField("myfixed", index);
recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed")));
recordConsumer.endField("myfixed", index++);

recordConsumer.endMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void write(Map<String, Object> record) {
recordConsumer.endField("mydouble", index++);

recordConsumer.startField("mybytes", index);
recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes")));
recordConsumer.endField("mybytes", index++);

recordConsumer.startField("mystring", index);
Expand Down Expand Up @@ -499,7 +499,7 @@ public void write(Map<String, Object> record) {
recordConsumer.endField("mymap", index++);

recordConsumer.startField("myfixed", index);
recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed")));
recordConsumer.endField("myfixed", index++);

recordConsumer.endMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void generateData(Path outFile, Configuration configuration, ParquetPrope
.append("float_field", 1.0f)
.append("double_field", 2.0d)
.append("flba_field", new String(chars))
.append("int96_field", Binary.fromByteArray(new byte[12]))
.append("int96_field", Binary.fromConstantByteArray(new byte[12]))
);
}
writer.close();
Expand Down
104 changes: 104 additions & 0 deletions parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet;

import org.apache.parquet.SemanticVersion.SemanticVersionParseException;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.VersionParser.VersionParseException;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;

/**
* There was a bug (PARQUET-251) that caused the statistics metadata
* for binary columns to be corrupted in the write path.
*
* This class is used to detect whether a file was written with this bug,
* and thus it's statistics should be ignored / not trusted.
*/
public class CorruptStatistics {
private static final Log LOG = Log.getLog(CorruptStatistics.class);

// the version in which the bug described by jira: PARQUET-251 was fixed
// the bug involved writing invalid binary statistics, so stats written prior to this
// fix must be ignored / assumed invalid
private static final SemanticVersion PARQUET_251_FIXED_VERSION = new SemanticVersion(1, 8, 0);

/**
* Decides if the statistics from a file created by createdBy (the created_by field from parquet format)
* should be ignored because they are potentially corrupt.
*/
public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) {

if (columnType != PrimitiveTypeName.BINARY && columnType != PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
// the bug only applies to binary columns
return false;
}

if (Strings.isNullOrEmpty(createdBy)) {
// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
LOG.info("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
return true;
}

try {
ParsedVersion version = VersionParser.parse(createdBy);

if (!"parquet-mr".equals(version.application)) {
// assume other applications don't have this bug
return false;
}

if (Strings.isNullOrEmpty(version.semver)) {
LOG.warn("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
return true;
}

SemanticVersion semver = SemanticVersion.parse(version.semver);

if (semver.compareTo(PARQUET_251_FIXED_VERSION) < 0) {
LOG.info("Ignoring statistics because this file was created prior to "
+ PARQUET_251_FIXED_VERSION
+ ", see PARQUET-251" );
return true;
}

// this file was created after the fix
return false;
} catch (RuntimeException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
return true;
} catch (SemanticVersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
return true;
} catch (VersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
return true;
}
}

private static void warnParseError(String createdBy, Throwable e) {
LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ public void mergeStatisticsMinMax(Statistics stats) {
}
}

/**
* Sets min and max values, re-uses the byte[] passed in.
* Any changes made to byte[] will be reflected in min and max values as well.
* @param minBytes byte array to set the min value to
* @param maxBytes byte array to set the max value to
*/
@Override
public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
max = Binary.fromByteArray(maxBytes);
min = Binary.fromByteArray(minBytes);
max = Binary.fromReusedByteArray(maxBytes);
min = Binary.fromReusedByteArray(minBytes);
this.markAsNotEmpty();
}

Expand All @@ -72,13 +78,13 @@ else if (!this.isEmpty())
}

public void updateStats(Binary min_value, Binary max_value) {
if (min.compareTo(min_value) > 0) { min = min_value; }
if (max.compareTo(max_value) < 0) { max = max_value; }
if (min.compareTo(min_value) > 0) { min = min_value.copy(); }
if (max.compareTo(max_value) < 0) { max = max_value.copy(); }
}

public void initializeStats(Binary min_value, Binary max_value) {
min = min_value;
max = max_value;
min = min_value.copy();
max = max_value.copy();
this.markAsNotEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Binary readBytes() {
int length = lengthReader.readInteger();
int start = offset;
offset = start + length;
return Binary.fromByteArray(in, start, length);
return Binary.fromConstantByteArray(in, start, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
public void writeBytes(Binary v) {
try {
lengthWriter.writeInteger(v.length());
out.write(v.getBytes());
v.writeTo(out);
} catch (IOException e) {
throw new ParquetEncodingException("could not write bytes", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class DeltaByteArrayReader extends ValuesReader {
public DeltaByteArrayReader() {
this.prefixLengthReader = new DeltaBinaryPackingValuesReader();
this.suffixReader = new DeltaLengthByteArrayValuesReader();
this.previous = Binary.fromByteArray(new byte[0]);
this.previous = Binary.fromConstantByteArray(new byte[0]);
}

@Override
Expand All @@ -67,9 +67,9 @@ public Binary readBytes() {
// We have to do this to materialize the output
if(prefixLength != 0) {
byte[] out = new byte[length];
System.arraycopy(previous.getBytes(), 0, out, 0, prefixLength);
System.arraycopy(suffix.getBytes(), 0, out, prefixLength, suffix.length());
previous = Binary.fromByteArray(out);
System.arraycopy(previous.getBytesUnsafe(), 0, out, 0, prefixLength);
System.arraycopy(suffix.getBytesUnsafe(), 0, out, prefixLength, suffix.length());
previous = Binary.fromConstantByteArray(out);
} else {
previous = suffix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void writeBytes(Binary v) {
int length = previous.length < vb.length ? previous.length : vb.length;
for(i = 0; (i < length) && (previous[i] == vb[i]); i++);
prefixLengthWriter.writeInteger(i);
suffixWriter.writeBytes(Binary.fromByteArray(vb, i, vb.length - i));
suffixWriter.writeBytes(v.slice(i, vb.length - i));
previous = vb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void writeBytes(Binary v) {
int id = binaryDictionaryContent.getInt(v);
if (id == -1) {
id = binaryDictionaryContent.size();
binaryDictionaryContent.put(copy(v), id);
binaryDictionaryContent.put(v.copy(), id);
// length as int (4 bytes) + actual bytes
dictionaryByteSize += 4 + v.length();
}
Expand Down Expand Up @@ -283,11 +283,6 @@ public void fallBackDictionaryEncodedData(ValuesWriter writer) {
writer.writeBytes(reverseDictionary[id]);
}
}

protected static Binary copy(Binary binary) {
return Binary.fromByteArray(
Arrays.copyOf(binary.getBytes(), binary.length()));
}
}

/**
Expand All @@ -311,7 +306,7 @@ public void writeBytes(Binary value) {
int id = binaryDictionaryContent.getInt(value);
if (id == -1) {
id = binaryDictionaryContent.size();
binaryDictionaryContent.put(copy(value), id);
binaryDictionaryContent.put(value.copy(), id);
dictionaryByteSize += length;
}
encodedValues.add(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) thro
// read the length
offset += 4;
// wrap the content in a binary
binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
binaryDictionaryContent[i] = Binary.fromConstantByteArray(dictionaryBytes, offset, len);
// increment to the next value
offset += len;
}
Expand All @@ -106,7 +106,7 @@ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) thro
"Invalid byte array length: " + length);
for (int i = 0; i < binaryDictionaryContent.length; i++) {
// wrap the content in a Binary
binaryDictionaryContent[i] = Binary.fromByteArray(
binaryDictionaryContent[i] = Binary.fromConstantByteArray(
dictionaryBytes, offset, length);
// increment to the next value
offset += length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Binary readBytes() {
int length = BytesUtils.readIntLittleEndian(in, offset);
int start = offset + 4;
offset = start + length;
return Binary.fromByteArray(in, start, length);
return Binary.fromConstantByteArray(in, start, length);
} catch (IOException e) {
throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Binary readBytes() {
try {
int start = offset;
offset = start + length;
return Binary.fromByteArray(in, start, length);
return Binary.fromConstantByteArray(in, start, length);
} catch (RuntimeException e) {
throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Binary toBinary() {
buf.putLong(timeOfDayNanos);
buf.putInt(julianDay);
buf.flip();
return Binary.fromByteBuffer(buf);
return Binary.fromConstantByteBuffer(buf);
}

public Int96Value toInt96() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void addBoolean(boolean value) {
*/
@Override
public void addBinary(Binary value) {
if (DEBUG) log(Arrays.toString(value.getBytes()));
if (DEBUG) log(Arrays.toString(value.getBytesUnsafe()));
delegate.addBinary(value);
}

Expand Down
Loading

0 comments on commit e3b9502

Please sign in to comment.