Skip to content

Commit 0c37639

Browse files
committed
fix int96
1 parent b711c8e commit 0c37639

File tree

4 files changed

+26
-79
lines changed

4 files changed

+26
-79
lines changed

velox/dwio/common/SelectiveColumnReader.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ void SelectiveColumnReader::getIntValues(
190190
getFlatValues<UnscaledLongDecimal, UnscaledLongDecimal>(
191191
rows, result, requestedType);
192192
break;
193+
case TypeKind::TIMESTAMP:
194+
getFlatValues<Timestamp, Timestamp>(
195+
rows, result, requestedType);
196+
break;
193197
case TypeKind::BIGINT:
194198
switch (valueSize_) {
195199
case 8:

velox/dwio/parquet/reader/PageReader.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,8 +414,9 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
414414
break;
415415
}
416416
case thrift::Type::INT96: {
417+
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
418+
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
417419
auto numBytes = dictionary_.numValues * sizeof(int96_t);
418-
dictionary_.values = AlignedBuffer::allocate<char>(numBytes, &pool_);
419420
if (pageData_) {
420421
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
421422
} else {
@@ -426,6 +427,23 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
426427
bufferStart_,
427428
bufferEnd_);
428429
}
430+
// Expand the Parquet type length values to Velox type length.
431+
// We start from the end to allow in-place expansion.
432+
auto values = dictionary_.values->asMutable<Timestamp>();
433+
auto parquetValues = dictionary_.values->asMutable<char>();
434+
constexpr int64_t JULIAN_TO_UNIX_EPOCH_DAYS = 2440588LL;
435+
constexpr int64_t SECONDS_PER_DAY = 86400LL;
436+
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
437+
// Convert the timestamp into seconds and nanos since the Unix epoch,
438+
// 00:00:00.000000 on 1 January 1970.
439+
uint64_t nanos;
440+
memcpy(&nanos, parquetValues + i * sizeof(int96_t), sizeof(uint64_t));
441+
int32_t days;
442+
memcpy(&days, parquetValues + i * sizeof(int96_t) + + sizeof(uint64_t),
443+
sizeof(int32_t));
444+
values[i] = Timestamp(
445+
(days - JULIAN_TO_UNIX_EPOCH_DAYS) * SECONDS_PER_DAY, nanos);
446+
}
429447
break;
430448
}
431449
case thrift::Type::BYTE_ARRAY: {

velox/dwio/parquet/reader/TimestampColumnReader.h

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -41,82 +41,6 @@ class TimestampColumnReader : public IntegerColumnReader {
4141
prepareRead<int128_t>(offset, rows, nullptr);
4242
readCommon<IntegerColumnReader>(rows);
4343
}
44-
45-
void getValues(RowSet rows, VectorPtr* result) override {
46-
auto type = nodeType_->type;
47-
VELOX_CHECK(type->kind() == TypeKind::TIMESTAMP, "Timestamp expected.");
48-
VELOX_CHECK_NE(valueSize_, kNoValueSize);
49-
VELOX_CHECK(mayGetValues_);
50-
if (allNull_) {
51-
*result = std::make_shared<ConstantVector<Timestamp>>(
52-
&memoryPool_,
53-
rows.size(),
54-
true,
55-
type,
56-
Timestamp(),
57-
SimpleVectorStats<Timestamp>{},
58-
sizeof(Timestamp) * rows.size());
59-
return;
60-
}
61-
VELOX_CHECK_LE(rows.size(), numValues_);
62-
VELOX_CHECK(!rows.empty());
63-
if (!values_) {
64-
return;
65-
}
66-
67-
auto tsValues = AlignedBuffer::allocate<Timestamp>(
68-
rows.size(), &memoryPool_, Timestamp());
69-
auto* valuesPtr = tsValues->asMutable<Timestamp>();
70-
char* rawValues = reinterpret_cast<char*>(rawValues_);
71-
72-
vector_size_t rowIndex = 0;
73-
auto nextRow = rows[rowIndex];
74-
bool moveNulls = shouldMoveNulls(rows);
75-
bool emptyOutputRows = outputRows_.size() == 0;
76-
for (size_t i = 0; i < numValues_; i++) {
77-
if (!emptyOutputRows && outputRows_[i] < nextRow) {
78-
continue;
79-
}
80-
VELOX_DCHECK(emptyOutputRows || (outputRows_[i] == nextRow));
81-
82-
// Convert the timestamp into seconds and nanos since the Unix epoch,
83-
// 00:00:00.000000 on 1 January 1970.
84-
uint64_t nanos;
85-
memcpy(&nanos, rawValues + nextRow * sizeof(int96_t), sizeof(uint64_t));
86-
int32_t days;
87-
memcpy(
88-
&days,
89-
rawValues + nextRow * sizeof(int96_t) + sizeof(uint64_t),
90-
sizeof(int32_t));
91-
valuesPtr[rowIndex] = Timestamp(
92-
(days - JULIAN_TO_UNIX_EPOCH_DAYS) * SECONDS_PER_DAY, nanos);
93-
94-
if (moveNulls && rowIndex != i) {
95-
bits::setBit(
96-
rawResultNulls_, rowIndex, bits::isBitSet(rawResultNulls_, i));
97-
}
98-
if (!emptyOutputRows) {
99-
outputRows_[rowIndex] = nextRow;
100-
}
101-
rowIndex++;
102-
if (rowIndex >= rows.size()) {
103-
break;
104-
}
105-
nextRow = rows[rowIndex];
106-
}
107-
108-
BufferPtr nulls = anyNulls_
109-
? (returnReaderNulls_ ? nullsInReadRange_ : resultNulls_)
110-
: nullptr;
111-
112-
*result = std::make_shared<FlatVector<Timestamp>>(
113-
&memoryPool_,
114-
type,
115-
nulls,
116-
rows.size(),
117-
tsValues,
118-
std::move(stringBuffers_));
119-
}
12044
};
12145

12246
} // namespace facebook::velox::parquet

velox/type/Type.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@
4646
namespace facebook::velox {
4747

4848
using int128_t = __int128_t;
49-
struct int96_t {
50-
int32_t val[3];
49+
struct __attribute__((__packed__)) int96_t {
50+
int32_t days;
51+
uint64_t nanos;
5152
};
5253

5354
/// Velox type system supports a small set of SQL-compatible composeable types:

0 commit comments

Comments
 (0)