Skip to content

[SPARK-14015][SQL] Support TimestampType in vectorized parquet reader #11882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ void readBatch(int total, ColumnVector column) throws IOException {
case INT64:
readLongBatch(rowId, num, column);
break;
case INT96:
readBinaryBatch(rowId, num, column);
break;
case FLOAT:
readFloatBatch(rowId, num, column);
break;
Expand Down Expand Up @@ -249,7 +252,17 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
case BINARY:
column.setDictionary(dictionary);
break;

case INT96:
if (column.dataType() == DataTypes.TimestampType) {
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm -- maybe we can do something a lot cheaper? At the very least maybe we can remove the creation of the this Binary object, since we are turning it immediately into a Long.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, that sounds good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO for converting the dictionary of binaries into long to make it cheaper

column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new NotImplementedException();
}
break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
Expand Down Expand Up @@ -342,9 +355,19 @@ private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOE
private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
if (column.isArray()) {
defColumn.readBinarys(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
} else if (column.dataType() == DataTypes.TimestampType) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
// Read 12 bytes for INT96
CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
} else {
column.putNull(rowId + i);
}
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,13 @@ private void initializeInternal() throws IOException {
/**
* Check that the requested schema is supported.
*/
OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
throw new IOException("Complex types not supported.");
}
PrimitiveType primitiveType = t.asPrimitiveType();

originalTypes[i] = t.getOriginalType();

// TODO: Be extremely cautious in what is supported. Expand this.
if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL &&
originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE &&
originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) {
throw new IOException("Unsupported type: " + t);
}
if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
throw new IOException("Int96 not supported.");
}
String[] colPath = requestedSchema.getPaths().get(i);
if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private void reserveInternal(int newCapacity) {
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
} else if (type instanceof LongType || type instanceof DoubleType ||
DecimalType.is64BitDecimalType(type)) {
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
} else if (resultStruct != null) {
// Nothing to store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ private void reserveInternal(int newCapacity) {
int[] newData = new int[newCapacity];
if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
intData = newData;
} else if (type instanceof LongType || DecimalType.is64BitDecimalType(type)) {
} else if (type instanceof LongType || type instanceof TimestampType ||
DecimalType.is64BitDecimalType(type)) {
long[] newData = new long[newCapacity];
if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
longData = newData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -659,4 +660,13 @@ private[parquet] object CatalystRowConverter {
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
unscaled
}

def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buffer.getLong
val julianDay = buffer.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,44 +116,56 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
new MyDenseVectorUDT()
).filter(supportsDataType)

for (dataType <- supportedDataTypes) {
test(s"test all data types - $dataType") {
withTempPath { file =>
val path = file.getCanonicalPath

val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
new Random(System.nanoTime())
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
try {
for (dataType <- supportedDataTypes) {
for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
test(s"test all data types - $dataType with parquet.enable.dictionary = " +
s"$parquetDictionaryEncodingEnabled") {

hadoopConfiguration.setBoolean("parquet.enable.dictionary",
parquetDictionaryEncodingEnabled)

withTempPath { file =>
val path = file.getCanonicalPath

val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
new Random(System.nanoTime())
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
}

// Create a DF for the schema with random data. The index field is used to sort the
// DataFrame. This is a workaround for SPARK-10591.
val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", dataType, nullable = true)
val rdd =
sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)

df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.save(path)

val loadedDF = sqlContext
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.load(path)
.orderBy("index")

checkAnswer(loadedDF, df)
}
}

// Create a DF for the schema with random data. The index field is used to sort the
// DataFrame. This is a workaround for SPARK-10591.
val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", dataType, nullable = true)
val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)

df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.save(path)

val loadedDF = sqlContext
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.load(path)
.orderBy("index")

checkAnswer(loadedDF, df)
}
}
} finally {
hadoopConfiguration.unset("parquet.enable.dictionary")
}

test("save()/load() - non-partitioned table - Overwrite") {
Expand Down