Skip to content

Commit 20d4a17

Browse files
committed
PARQUET-741: Always allocate fresh buffers while compressing
Introduces another allocation at the cost of an actually working compression path. Also extended the column-writer test to write several columns. Author: Uwe L. Korn <uwelk@xhochy.com> Author: Korn, Uwe <Uwe.Korn@blue-yonder.com> Closes apache#173 from xhochy/PARQUET-741 and squashes the following commits: ce46816 [Uwe L. Korn] Use emplace_back to get rid of the shared_ptr 0d2f041 [Uwe L. Korn] Fix signed comparison ac1ccf0 [Uwe L. Korn] Minor style fixes 4cb03f8 [Uwe L. Korn] Fix FLBA tests a559123 [Korn, Uwe] PARQUET-741: Always allocate fresh buffers while compressing Change-Id: Id96866f012fa0536176d82ddcfc984ef9bf8a4eb
1 parent 53958b1 commit 20d4a17

File tree

3 files changed

+78
-21
lines changed

3 files changed

+78
-21
lines changed

cpp/src/parquet/column/column-writer-test.cc

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "parquet/file/reader-internal.h"
2626
#include "parquet/file/writer-internal.h"
2727
#include "parquet/types.h"
28+
#include "parquet/util/comparison.h"
2829
#include "parquet/util/input.h"
2930
#include "parquet/util/output.h"
3031

@@ -38,7 +39,7 @@ namespace test {
3839
// The default size used in most tests.
3940
const int SMALL_SIZE = 100;
4041
// Larger size to test some corner cases, only used in some specific cases.
41-
const int LARGE_SIZE = 10000;
42+
const int LARGE_SIZE = 100000;
4243
// Very large size to test dictionary fallback.
4344
const int VERY_LARGE_SIZE = 400000;
4445

@@ -97,26 +98,37 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
9798
this->SyncValuesOut();
9899
}
99100

101+
void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
102+
100103
void TestRequiredWithEncoding(Encoding::type encoding) {
101104
return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
102105
}
103106

104107
void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
105-
bool enable_dictionary, bool enable_statistics) {
106-
this->GenerateData(SMALL_SIZE);
108+
bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
109+
this->GenerateData(num_rows);
107110

108111
// Test case 1: required and non-repeated, so no definition or repetition levels
109112
ColumnProperties column_properties(
110113
encoding, compression, enable_dictionary, enable_statistics);
111114
std::shared_ptr<TypedColumnWriter<TestType>> writer =
112-
this->BuildWriter(SMALL_SIZE, column_properties);
115+
this->BuildWriter(num_rows, column_properties);
113116
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
114117
// The behaviour should be independent from the number of Close() calls
115118
writer->Close();
116119
writer->Close();
117120

118-
this->ReadColumn(compression);
119-
ASSERT_EQ(SMALL_SIZE, this->values_read_);
121+
this->SetupValuesOut(num_rows);
122+
this->ReadColumnFully(compression);
123+
Compare<T> compare(this->descr_);
124+
for (size_t i = 0; i < this->values_.size(); i++) {
125+
if (compare(this->values_[i], this->values_out_[i]) ||
126+
compare(this->values_out_[i], this->values_[i])) {
127+
std::cout << "Failed at " << i << std::endl;
128+
}
129+
ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
130+
ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
131+
}
120132
ASSERT_EQ(this->values_, this->values_out_);
121133
}
122134

@@ -154,8 +166,53 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
154166
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
155167
std::unique_ptr<InMemoryOutputStream> sink_;
156168
std::shared_ptr<WriterProperties> writer_properties_;
169+
std::vector<std::vector<uint8_t>> data_buffer_;
157170
};
158171

172+
template <typename TestType>
173+
void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
174+
BuildReader(compression);
175+
values_read_ = 0;
176+
while (values_read_ < static_cast<int64_t>(this->values_out_.size())) {
177+
int64_t values_read_recently = 0;
178+
reader_->ReadBatch(this->values_out_.size() - values_read_,
179+
definition_levels_out_.data() + values_read_,
180+
repetition_levels_out_.data() + values_read_,
181+
this->values_out_ptr_ + values_read_, &values_read_recently);
182+
values_read_ += values_read_recently;
183+
}
184+
this->SyncValuesOut();
185+
}
186+
187+
template <>
188+
void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
189+
BuildReader(compression);
190+
this->data_buffer_.clear();
191+
192+
values_read_ = 0;
193+
while (values_read_ < static_cast<int64_t>(this->values_out_.size())) {
194+
int64_t values_read_recently = 0;
195+
reader_->ReadBatch(this->values_out_.size() - values_read_,
196+
definition_levels_out_.data() + values_read_,
197+
repetition_levels_out_.data() + values_read_,
198+
this->values_out_ptr_ + values_read_, &values_read_recently);
199+
200+
// Copy contents of the pointers
201+
std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
202+
uint8_t* data_ptr = data.data();
203+
for (int64_t i = 0; i < values_read_recently; i++) {
204+
memcpy(data_ptr + this->descr_->type_length() * i,
205+
this->values_out_[i + values_read_].ptr, this->descr_->type_length());
206+
this->values_out_[i + values_read_].ptr =
207+
data_ptr + this->descr_->type_length() * i;
208+
}
209+
data_buffer_.emplace_back(std::move(data));
210+
211+
values_read_ += values_read_recently;
212+
}
213+
this->SyncValuesOut();
214+
}
215+
159216
typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
160217
BooleanType, ByteArrayType, FLBAType> TestTypes;
161218

@@ -198,23 +255,28 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
198255
*/
199256

200257
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
201-
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false);
258+
this->TestRequiredWithSettings(
259+
Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE);
202260
}
203261

204262
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
205-
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false);
263+
this->TestRequiredWithSettings(
264+
Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE);
206265
}
207266

208267
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
209-
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true);
268+
this->TestRequiredWithSettings(
269+
Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE);
210270
}
211271

212272
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
213-
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true);
273+
this->TestRequiredWithSettings(
274+
Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE);
214275
}
215276

216277
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
217-
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true);
278+
this->TestRequiredWithSettings(
279+
Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE);
218280
}
219281

220282
TYPED_TEST(TestPrimitiveWriter, Optional) {

cpp/src/parquet/file/writer-internal.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type
4141
dictionary_page_offset_(0),
4242
data_page_offset_(0),
4343
total_uncompressed_size_(0),
44-
total_compressed_size_(0),
45-
compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) {
44+
total_compressed_size_(0) {
4645
compressor_ = Codec::Create(codec);
4746
}
4847

@@ -72,11 +71,11 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
7271
// Compress the data
7372
int64_t max_compressed_size =
7473
compressor_->MaxCompressedLen(buffer->size(), buffer->data());
75-
compression_buffer_->Resize(max_compressed_size);
74+
auto compression_buffer = std::make_shared<OwnedMutableBuffer>(max_compressed_size);
7675
int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
77-
max_compressed_size, compression_buffer_->mutable_data());
78-
compression_buffer_->Resize(compressed_size);
79-
return compression_buffer_;
76+
max_compressed_size, compression_buffer->mutable_data());
77+
compression_buffer->Resize(compressed_size);
78+
return compression_buffer;
8079
}
8180

8281
int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {

cpp/src/parquet/file/writer-internal.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ class SerializedPageWriter : public PageWriter {
4646

4747
/**
4848
* Compress a buffer.
49-
*
50-
* This method may return compression_buffer_ and thus the resulting memory
51-
* is only valid until the next call to Compress().
5249
*/
5350
std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override;
5451

@@ -65,7 +62,6 @@ class SerializedPageWriter : public PageWriter {
6562

6663
// Compression codec to use.
6764
std::unique_ptr<Codec> compressor_;
68-
std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
6965
};
7066

7167
// RowGroupWriter::Contents implementation for the Parquet file specification

0 commit comments

Comments
 (0)