Skip to content

Commit 2152bfc

Browse files
Deepak Majetijulienledem
authored andcommitted
PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types
This PR adds support for INT96 and FIXED_LEN_BYTE_ARRAY types. It modifies the examples and DebugPrint to handle these types. Author: Deepak Majeti <deepak.majeti@hp.com> Closes apache#27 from majetideepak/master and squashes the following commits: 5ba0a03 [Deepak Majeti] PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types Change-Id: I8c86350ae1245582dba6167b18f5cc92cd35e7f9
1 parent b1d44f1 commit 2152bfc

File tree

7 files changed

+120
-15
lines changed

7 files changed

+120
-15
lines changed

cpp/src/parquet/column_reader.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData*
187187
return std::make_shared<DoubleReader>(metadata, element, stream);
188188
case Type::BYTE_ARRAY:
189189
return std::make_shared<ByteArrayReader>(metadata, element, stream);
190+
case Type::FIXED_LEN_BYTE_ARRAY:
191+
return std::make_shared<FixedLenByteArrayReader>(metadata, element, stream);
190192
default:
191193
ParquetException::NYI("type reader not implemented");
192194
}

cpp/src/parquet/column_reader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
159159
typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
160160
typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
161161
typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
162+
typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;
162163

163164

164165
template <int TYPE>

cpp/src/parquet/encodings/dictionary-encoding.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,25 @@ inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init(
111111
}
112112
}
113113

114+
template <>
115+
inline void DictionaryDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Init(
116+
Decoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
117+
int num_dictionary_values = dictionary->values_left();
118+
dictionary_.resize(num_dictionary_values);
119+
dictionary->Decode(&dictionary_[0], num_dictionary_values);
120+
121+
int fixed_len = schema_->type_length;
122+
int total_size = num_dictionary_values*fixed_len;
123+
124+
byte_array_data_.resize(total_size);
125+
int offset = 0;
126+
for (int i = 0; i < num_dictionary_values; ++i) {
127+
memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
128+
dictionary_[i].ptr = &byte_array_data_[offset];
129+
offset += fixed_len;
130+
}
131+
}
132+
114133
} // namespace parquet_cpp
115134

116135
#endif

cpp/src/parquet/encodings/plain-encoding.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
7474
return max_values;
7575
}
7676

77+
// Template specialization for FIXED_LEN_BYTE_ARRAY
78+
template <>
79+
inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(FixedLenByteArray* buffer,
80+
int max_values) {
81+
max_values = std::min(max_values, num_values_);
82+
int len = schema_->type_length;
83+
for (int i = 0; i < max_values; ++i) {
84+
if (len_ < len) ParquetException::EofException();
85+
buffer[i].ptr = data_;
86+
data_ += len;
87+
len_ -= len;
88+
}
89+
num_values_ -= max_values;
90+
return max_values;
91+
}
92+
7793
template <>
7894
class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
7995
public:

cpp/src/parquet/reader.cc

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,9 @@ static string parquet_type_to_string(Type::type t) {
217217
case Type::INT64:
218218
return "INT64";
219219
break;
220+
case Type::INT96:
221+
return "INT96";
222+
break;
220223
case Type::FLOAT:
221224
return "FLOAT";
222225
break;
@@ -226,9 +229,6 @@ static string parquet_type_to_string(Type::type t) {
226229
case Type::BYTE_ARRAY:
227230
return "BYTE_ARRAY";
228231
break;
229-
case Type::INT96:
230-
return "INT96";
231-
break;
232232
case Type::FIXED_LEN_BYTE_ARRAY:
233233
return "FIXED_LEN_BYTE_ARRAY";
234234
break;
@@ -239,7 +239,7 @@ static string parquet_type_to_string(Type::type t) {
239239
}
240240

241241
// the fixed initial size is just for an example
242-
#define COL_WIDTH "17"
242+
#define COL_WIDTH "20"
243243

244244
void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
245245
if (!parsed_metadata_) {
@@ -251,10 +251,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
251251
for (int c = 1; c < metadata_.schema.size(); ++c) {
252252
stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " ("
253253
<< parquet_type_to_string(metadata_.schema[c].type);
254-
if (metadata_.schema[c].type == Type::INT96 ||
255-
metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
256-
stream << " - not supported";
257-
}
258254
stream << ")\n";
259255
}
260256

@@ -291,10 +287,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
291287

292288
printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str());
293289

294-
if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) {
295-
continue;
296-
}
297-
298290
// This is OK in this method as long as the RowGroupReader does not get deleted
299291
readers[c] = col_reader;
300292
}
@@ -345,6 +337,16 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
345337
}
346338
break;
347339
}
340+
case Type::INT96: {
341+
Int96 val = reinterpret_cast<Int96Reader*>(readers[c])->NextValue(
342+
&def_level[c], &rep_level[c]);
343+
if (def_level[c] >= rep_level[c]) {
344+
string result = Int96ToString(val);
345+
snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
346+
stream << buffer;
347+
}
348+
break;
349+
}
348350
case Type::FLOAT: {
349351
float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue(
350352
&def_level[c], &rep_level[c]);
@@ -373,7 +375,17 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
373375
}
374376
break;
375377
}
376-
default:
378+
case Type::FIXED_LEN_BYTE_ARRAY: {
379+
FixedLenByteArray val = reinterpret_cast<FixedLenByteArrayReader*>(
380+
readers[c])->NextValue(&def_level[c], &rep_level[c]);
381+
if (def_level[c] >= rep_level[c]) {
382+
string result = FixedLenByteArrayToString(val, metadata_.schema[c+1].type_length);
383+
snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
384+
stream << buffer;
385+
}
386+
break;
387+
}
388+
default:
377389
continue;
378390
}
379391
}

cpp/src/parquet/types.h

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
#include <cstdint>
2323
#include <cstring>
2424
#include <string>
25+
#include <sstream>
2526

2627
#include "parquet/thrift/parquet_types.h"
28+
#include "parquet/util/compiler-util.h"
2729

2830
namespace parquet_cpp {
2931

@@ -32,11 +34,36 @@ struct ByteArray {
3234
const uint8_t* ptr;
3335
};
3436

37+
struct FixedLenByteArray {
38+
const uint8_t* ptr;
39+
};
40+
41+
MANUALLY_ALIGNED_STRUCT(1) Int96 {
42+
uint32_t value[3];
43+
};
44+
STRUCT_END(Int96, 12);
3545

3646
static inline std::string ByteArrayToString(const ByteArray& a) {
3747
return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
3848
}
3949

50+
static inline std::string Int96ToString(const Int96& a) {
51+
std::stringstream result;
52+
for (int i = 0; i < 3; i++) {
53+
result << a.value[i] << " ";
54+
}
55+
return result.str();
56+
}
57+
58+
static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) {
59+
const uint8_t *bytes = reinterpret_cast<const uint8_t*>(a.ptr);
60+
std::stringstream result;
61+
for (int i = 0; i < len; i++) {
62+
result << (uint32_t)bytes[i] << " ";
63+
}
64+
return result.str();
65+
}
66+
4067
static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
4168
int len = std::min(x1.len, x2.len);
4269
int cmp = memcmp(x1.ptr, x2.ptr, len);
@@ -76,8 +103,7 @@ struct type_traits<parquet::Type::INT64> {
76103

77104
template <>
78105
struct type_traits<parquet::Type::INT96> {
79-
// TODO
80-
typedef void* value_type;
106+
typedef Int96 value_type;
81107
static constexpr parquet::Type::type parquet_type = parquet::Type::INT96;
82108

83109
static constexpr size_t value_byte_size = 12;
@@ -107,6 +133,14 @@ struct type_traits<parquet::Type::BYTE_ARRAY> {
107133
static constexpr size_t value_byte_size = sizeof(ByteArray);
108134
};
109135

136+
template <>
137+
struct type_traits<parquet::Type::FIXED_LEN_BYTE_ARRAY> {
138+
typedef FixedLenByteArray value_type;
139+
static constexpr parquet::Type::type parquet_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
140+
141+
static constexpr size_t value_byte_size = sizeof(FixedLenByteArray);
142+
};
143+
110144
} // namespace parquet_cpp
111145

112146
#endif // PARQUET_TYPES_H

cpp/src/parquet/util/compiler-util.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,25 @@
3636

3737
#define PREFETCH(addr) __builtin_prefetch(addr)
3838

39+
//macros to disable padding
40+
//these macros are portable across different compilers and platforms
41+
//[https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flatbuffers.h#L1355]
42+
#if defined(_MSC_VER)
43+
#define MANUALLY_ALIGNED_STRUCT(alignment) \
44+
__pragma(pack(1)); \
45+
struct __declspec(align(alignment))
46+
#define STRUCT_END(name, size) \
47+
__pragma(pack()); \
48+
static_assert(sizeof(name) == size, "compiler breaks packing rules")
49+
#elif defined(__GNUC__) || defined(__clang__)
50+
#define MANUALLY_ALIGNED_STRUCT(alignment) \
51+
_Pragma("pack(1)") \
52+
struct __attribute__((aligned(alignment)))
53+
#define STRUCT_END(name, size) \
54+
_Pragma("pack()") \
55+
static_assert(sizeof(name) == size, "compiler breaks packing rules")
56+
#else
57+
#error Unknown compiler, please define structure alignment macros
58+
#endif
59+
3960
#endif // PARQUET_UTIL_COMPILER_UTIL_H

0 commit comments

Comments
 (0)