Skip to content

Commit adcabc4

Browse files
Deepak Majetiwesm
authored andcommitted
PARQUET-687: C++: Switch to PLAIN encoding if dictionary grows too large
Implemented dictionary fallback encoding Added tests Added a fast path to serialize data pages Author: Deepak Majeti <deepak.majeti@hpe.com> Closes apache#157 from majetideepak/PARQUET-717 and squashes the following commits: 6f51df6 [Deepak Majeti] minor comment fix c498aeb [Deepak Majeti] modify comment style eac9114 [Deepak Majeti] clang format da46033 [Deepak Majeti] added comments and fixed review suggestions 312bad8 [Deepak Majeti] minor changes dd0cc7e [Deepak Majeti] Add all types to the test 54af38a [Deepak Majeti] clang format 84f360d [Deepak Majeti] added dictionary fallback support with tests Change-Id: I4b28ec4ba66389d06890b90a7f625887d1c6459f
1 parent 10ebdbd commit adcabc4

File tree

11 files changed

+192
-64
lines changed

11 files changed

+192
-64
lines changed

cpp/src/parquet/column/column-writer-test.cc

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ namespace test {
3939
const int SMALL_SIZE = 100;
4040
// Larger size to test some corner cases, only used in some specific cases.
4141
const int LARGE_SIZE = 10000;
42+
// Very large size to test dictionary fallback.
43+
const int VERY_LARGE_SIZE = 400000;
4244

4345
template <typename TestType>
4446
class TestPrimitiveWriter : public ::testing::Test {
@@ -74,10 +76,10 @@ class TestPrimitiveWriter : public ::testing::Test {
7476
repetition_levels_out_.resize(SMALL_SIZE);
7577

7678
SetUpSchemaRequired();
77-
metadata_accessor_ =
78-
ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
7979
}
8080

81+
Type::type type_num() { return TestType::type_num; }
82+
8183
void BuildReader() {
8284
auto buffer = sink_->GetBuffer();
8385
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
@@ -130,7 +132,23 @@ class TestPrimitiveWriter : public ::testing::Test {
130132
ASSERT_EQ(this->values_, this->values_out_);
131133
}
132134

133-
int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }
135+
int64_t metadata_num_values() {
136+
// Metadata accessor must be created lazily.
137+
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
138+
// complete (no changes to the metadata buffer can be made after instantiation)
139+
auto metadata_accessor =
140+
ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
141+
return metadata_accessor->num_values();
142+
}
143+
144+
std::vector<Encoding::type> metadata_encodings() {
145+
// Metadata accessor must be created lazily.
146+
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
147+
// complete (no changes to the metadata buffer can be made after instantiation)
148+
auto metadata_accessor =
149+
ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
150+
return metadata_accessor->encodings();
151+
}
134152

135153
protected:
136154
int64_t values_read_;
@@ -156,7 +174,6 @@ class TestPrimitiveWriter : public ::testing::Test {
156174
NodePtr node_;
157175
format::ColumnChunk thrift_metadata_;
158176
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
159-
std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
160177
std::shared_ptr<ColumnDescriptor> schema_;
161178
std::unique_ptr<InMemoryOutputStream> sink_;
162179
std::shared_ptr<WriterProperties> writer_properties_;
@@ -334,5 +351,31 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
334351
ASSERT_EQ(this->values_, this->values_out_);
335352
}
336353

354+
// Test case for dictionary fallback encoding
355+
TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
356+
this->GenerateData(VERY_LARGE_SIZE);
357+
358+
auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
359+
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
360+
writer->Close();
361+
362+
// Just read the first SMALL_SIZE rows to ensure we could read it back in
363+
this->ReadColumn();
364+
ASSERT_EQ(SMALL_SIZE, this->values_read_);
365+
this->values_.resize(SMALL_SIZE);
366+
ASSERT_EQ(this->values_, this->values_out_);
367+
std::vector<Encoding::type> encodings = this->metadata_encodings();
368+
// There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
369+
// Dictionary encoding is not allowed for boolean type
370+
// There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
371+
ASSERT_EQ(Encoding::RLE, encodings[0]);
372+
if (this->type_num() != Type::BOOLEAN) {
373+
ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]);
374+
ASSERT_EQ(Encoding::PLAIN, encodings[2]);
375+
} else {
376+
ASSERT_EQ(Encoding::PLAIN, encodings[1]);
377+
}
378+
}
379+
337380
} // namespace test
338381
} // namespace parquet

cpp/src/parquet/column/page.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,10 @@ class PageWriter {
171171
public:
172172
virtual ~PageWriter() {}
173173

174-
virtual void Close() = 0;
174+
// The Column Writer decides if dictionary encoding is used if set and
175+
// if the dictionary encoding has fallen back to default encoding on reaching dictionary
176+
// page limit
177+
virtual void Close(bool has_dictionary, bool fallback) = 0;
175178

176179
virtual int64_t WriteDataPage(const DataPage& page) = 0;
177180

cpp/src/parquet/column/properties-test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ TEST(TestWriterProperties, Basics) {
3737
std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
3838

3939
ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
40-
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize());
40+
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
4141
ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
4242
}
4343

cpp/src/parquet/column/properties.h

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ ReaderProperties PARQUET_EXPORT default_reader_properties();
8080

8181
static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
8282
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
83-
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
83+
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
84+
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
8485
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
8586
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
8687
ParquetVersion::PARQUET_1_0;
@@ -96,7 +97,8 @@ class PARQUET_EXPORT WriterProperties {
9697
Builder()
9798
: allocator_(default_allocator()),
9899
dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED),
99-
dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
100+
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
101+
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
100102
pagesize_(DEFAULT_PAGE_SIZE),
101103
version_(DEFAULT_WRITER_VERSION),
102104
created_by_(DEFAULT_CREATED_BY),
@@ -137,8 +139,13 @@ class PARQUET_EXPORT WriterProperties {
137139
return this->enable_dictionary(path->ToDotString());
138140
}
139141

140-
Builder* dictionary_pagesize(int64_t dictionary_psize) {
141-
dictionary_pagesize_ = dictionary_psize;
142+
Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
143+
dictionary_pagesize_limit_ = dictionary_psize_limit;
144+
return this;
145+
}
146+
147+
Builder* write_batch_size(int64_t write_batch_size) {
148+
write_batch_size_ = write_batch_size;
142149
return this;
143150
}
144151

@@ -214,17 +221,18 @@ class PARQUET_EXPORT WriterProperties {
214221
}
215222

216223
std::shared_ptr<WriterProperties> build() {
217-
return std::shared_ptr<WriterProperties>(
218-
new WriterProperties(allocator_, dictionary_enabled_default_,
219-
dictionary_enabled_, dictionary_pagesize_, pagesize_, version_, created_by_,
220-
default_encoding_, encodings_, default_codec_, codecs_));
224+
return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
225+
dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_,
226+
write_batch_size_, pagesize_, version_, created_by_, default_encoding_,
227+
encodings_, default_codec_, codecs_));
221228
}
222229

223230
private:
224231
MemoryAllocator* allocator_;
225232
bool dictionary_enabled_default_;
226233
std::unordered_map<std::string, bool> dictionary_enabled_;
227-
int64_t dictionary_pagesize_;
234+
int64_t dictionary_pagesize_limit_;
235+
int64_t write_batch_size_;
228236
int64_t pagesize_;
229237
ParquetVersion::type version_;
230238
std::string created_by_;
@@ -246,7 +254,9 @@ class PARQUET_EXPORT WriterProperties {
246254
return dictionary_enabled_default_;
247255
}
248256

249-
inline int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
257+
inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
258+
259+
inline int64_t write_batch_size() const { return write_batch_size_; }
250260

251261
inline int64_t data_pagesize() const { return pagesize_; }
252262

@@ -286,14 +296,16 @@ class PARQUET_EXPORT WriterProperties {
286296
private:
287297
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default,
288298
std::unordered_map<std::string, bool> dictionary_enabled,
289-
int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
290-
const std::string& created_by, Encoding::type default_encoding,
299+
int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize,
300+
ParquetVersion::type version, const std::string& created_by,
301+
Encoding::type default_encoding,
291302
std::unordered_map<std::string, Encoding::type> encodings,
292303
Compression::type default_codec, const ColumnCodecs& codecs)
293304
: allocator_(allocator),
294305
dictionary_enabled_default_(dictionary_enabled_default),
295306
dictionary_enabled_(dictionary_enabled),
296-
dictionary_pagesize_(dictionary_pagesize),
307+
dictionary_pagesize_limit_(dictionary_pagesize),
308+
write_batch_size_(write_batch_size),
297309
pagesize_(pagesize),
298310
parquet_version_(version),
299311
parquet_created_by_(created_by),
@@ -304,7 +316,8 @@ class PARQUET_EXPORT WriterProperties {
304316
MemoryAllocator* allocator_;
305317
bool dictionary_enabled_default_;
306318
std::unordered_map<std::string, bool> dictionary_enabled_;
307-
int64_t dictionary_pagesize_;
319+
int64_t dictionary_pagesize_limit_;
320+
int64_t write_batch_size_;
308321
int64_t pagesize_;
309322
ParquetVersion::type parquet_version_;
310323
std::string parquet_created_by_;

cpp/src/parquet/column/writer.cc

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
4747
num_buffered_encoded_values_(0),
4848
num_rows_(0),
4949
total_bytes_written_(0),
50-
closed_(false) {
50+
closed_(false),
51+
fallback_(false) {
5152
InitSinks();
5253
}
5354

@@ -118,7 +119,13 @@ void ColumnWriter::AddDataPage() {
118119
DataPage page(
119120
uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);
120121

121-
data_pages_.push_back(std::move(page));
122+
// Write the page to OutputStream eagerly if there is no dictionary or
123+
// if dictionary encoding has fallen back to PLAIN
124+
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
125+
data_pages_.push_back(std::move(page));
126+
} else { // Eagerly write pages
127+
WriteDataPage(page);
128+
}
122129

123130
// Re-initialize the sinks as GetBuffer made them invalid.
124131
InitSinks();
@@ -134,52 +141,71 @@ void ColumnWriter::WriteDataPage(const DataPage& page) {
134141
int64_t ColumnWriter::Close() {
135142
if (!closed_) {
136143
closed_ = true;
137-
if (has_dictionary_) { WriteDictionaryPage(); }
138-
// Write all outstanding data to a new page
139-
if (num_buffered_values_ > 0) { AddDataPage(); }
144+
if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
145+
146+
FlushBufferedDataPages();
140147

141-
for (size_t i = 0; i < data_pages_.size(); i++) {
142-
WriteDataPage(data_pages_[i]);
143-
}
148+
pager_->Close(has_dictionary_, fallback_);
144149
}
145150

146151
if (num_rows_ != expected_rows_) {
147152
throw ParquetException(
148-
"Less then the number of expected rows written in"
153+
"Less than the number of expected rows written in"
149154
" the current column chunk");
150155
}
151156

152-
pager_->Close();
153-
154157
return total_bytes_written_;
155158
}
156159

160+
void ColumnWriter::FlushBufferedDataPages() {
161+
// Write all outstanding data to a new page
162+
if (num_buffered_values_ > 0) { AddDataPage(); }
163+
for (size_t i = 0; i < data_pages_.size(); i++) {
164+
WriteDataPage(data_pages_[i]);
165+
}
166+
data_pages_.clear();
167+
}
168+
157169
// ----------------------------------------------------------------------
158170
// TypedColumnWriter
159171

160172
template <typename Type>
161-
TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
173+
TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* descr,
162174
std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
163175
const WriterProperties* properties)
164-
: ColumnWriter(schema, std::move(pager), expected_rows,
176+
: ColumnWriter(descr, std::move(pager), expected_rows,
165177
(encoding == Encoding::PLAIN_DICTIONARY ||
166178
encoding == Encoding::RLE_DICTIONARY),
167179
encoding, properties) {
168180
switch (encoding) {
169181
case Encoding::PLAIN:
170-
current_encoder_ = std::unique_ptr<EncoderType>(
171-
new PlainEncoder<Type>(schema, properties->allocator()));
182+
current_encoder_.reset(new PlainEncoder<Type>(descr, properties->allocator()));
172183
break;
173184
case Encoding::PLAIN_DICTIONARY:
174185
case Encoding::RLE_DICTIONARY:
175-
current_encoder_ = std::unique_ptr<EncoderType>(
176-
new DictEncoder<Type>(schema, &pool_, properties->allocator()));
186+
current_encoder_.reset(
187+
new DictEncoder<Type>(descr, &pool_, properties->allocator()));
177188
break;
178189
default:
179190
ParquetException::NYI("Selected encoding is not supported");
180191
}
181192
}
182193

194+
// Only one Dictionary Page is written.
195+
// Fallback to PLAIN if dictionary page limit is reached.
196+
template <typename Type>
197+
void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
198+
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
199+
if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
200+
WriteDictionaryPage();
201+
// Serialize the buffered Dictionary Indicies
202+
FlushBufferedDataPages();
203+
fallback_ = true;
204+
// Only PLAIN encoding is supported for fallback in V1
205+
current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->allocator()));
206+
}
207+
}
208+
183209
template <typename Type>
184210
void TypedColumnWriter<Type>::WriteDictionaryPage() {
185211
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());

0 commit comments

Comments
 (0)