Skip to content

Commit 9aae125

Browse files
Deepak Majetiwesm
authored andcommitted
PARQUET-731: API to return metadata size and Skip reading values
Author: Deepak Majeti <deepak.majeti@hpe.com> Closes apache#169 from majetideepak/PARQUET-731 and squashes the following commits: 15e539f [Deepak Majeti] use allocator and smaller memory footprint 3504edd [Deepak Majeti] clang format 25a4bc1 [Deepak Majeti] Added tests 1aeb8f5 [Deepak Majeti] API to skip values 343af37 [Deepak Majeti] Added API to get metadata size Change-Id: Ifd819be8968575a641448e12675d5cf6a2b37dd9
1 parent c9c7f4a commit 9aae125

File tree

6 files changed

+113
-7
lines changed

6 files changed

+113
-7
lines changed

cpp/src/parquet/arrow/arrow-reader-writer-benchmark.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
#include "parquet/arrow/reader.h"
2121
#include "parquet/arrow/writer.h"
22-
#include "parquet/file/reader-internal.h"
23-
#include "parquet/file/writer-internal.h"
2422
#include "parquet/column/reader.h"
2523
#include "parquet/column/writer.h"
24+
#include "parquet/file/reader-internal.h"
25+
#include "parquet/file/writer-internal.h"
2626
#include "parquet/util/input.h"
2727

2828
#include "arrow/column.h"

cpp/src/parquet/column/column-reader-test.cc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,64 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
151151
ExecuteDict(num_pages, levels_per_page, &descr);
152152
}
153153

154+
TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
155+
int levels_per_page = 100;
156+
int num_pages = 5;
157+
max_def_level_ = 0;
158+
max_rep_level_ = 0;
159+
NodePtr type = schema::Int32("b", Repetition::REQUIRED);
160+
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
161+
MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
162+
values_, data_buffer_, pages_, Encoding::PLAIN);
163+
InitReader(&descr);
164+
vector<int32_t> vresult(levels_per_page / 2, -1);
165+
vector<int16_t> dresult(levels_per_page / 2, -1);
166+
vector<int16_t> rresult(levels_per_page / 2, -1);
167+
168+
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
169+
int64_t values_read = 0;
170+
171+
// 1) skip_size > page_size (multiple pages skipped)
172+
// Skip first 2 pages
173+
int64_t levels_skipped = reader->Skip(2 * levels_per_page);
174+
ASSERT_EQ(2 * levels_per_page, levels_skipped);
175+
// Read half a page
176+
reader->ReadBatch(
177+
levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
178+
vector<int32_t> sub_values(
179+
values_.begin() + 2 * levels_per_page, values_.begin() + 2.5 * levels_per_page);
180+
ASSERT_TRUE(vector_equal(sub_values, vresult));
181+
182+
// 2) skip_size == page_size (skip across two pages)
183+
levels_skipped = reader->Skip(levels_per_page);
184+
ASSERT_EQ(levels_per_page, levels_skipped);
185+
// Read half a page
186+
reader->ReadBatch(
187+
levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
188+
sub_values.clear();
189+
sub_values.insert(sub_values.end(), values_.begin() + 3.5 * levels_per_page,
190+
values_.begin() + 4 * levels_per_page);
191+
ASSERT_TRUE(vector_equal(sub_values, vresult));
192+
193+
// 3) skip_size < page_size (skip limited to a single page)
194+
// Skip half a page
195+
levels_skipped = reader->Skip(levels_per_page / 2);
196+
ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
197+
// Read half a page
198+
reader->ReadBatch(
199+
levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
200+
sub_values.clear();
201+
sub_values.insert(
202+
sub_values.end(), values_.begin() + 4.5 * levels_per_page, values_.end());
203+
ASSERT_TRUE(vector_equal(sub_values, vresult));
204+
205+
values_.clear();
206+
def_levels_.clear();
207+
rep_levels_.clear();
208+
pages_.clear();
209+
reader_.reset();
210+
}
211+
154212
TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
155213
max_def_level_ = 0;
156214
max_rep_level_ = 0;

cpp/src/parquet/column/reader.h

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <cstring>
2424
#include <memory>
2525
#include <unordered_map>
26+
#include <vector>
2627

2728
#include "parquet/column/levels.h"
2829
#include "parquet/column/page.h"
@@ -124,8 +125,12 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
124125
// This API is the same for both V1 and V2 of the DataPage
125126
//
126127
// @returns: actual number of levels read (see values_read for number of values read)
127-
int64_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
128-
T* values, int64_t* values_read);
128+
int64_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
129+
int64_t* values_read);
130+
131+
// Skip reading levels
132+
// Returns the number of levels skipped
133+
int64_t Skip(int64_t num_rows_to_skip);
129134

130135
private:
131136
typedef Decoder<DType> DecoderType;
@@ -166,7 +171,7 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
166171

167172
// TODO(wesm): keep reading data pages until batch_size is reached, or the
168173
// row group is finished
169-
batch_size = std::min(batch_size, num_buffered_values_);
174+
batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
170175

171176
int64_t num_def_levels = 0;
172177
int64_t num_rep_levels = 0;
@@ -201,6 +206,39 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
201206
return total_values;
202207
}
203208

209+
template <typename DType>
210+
inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
211+
int64_t rows_to_skip = num_rows_to_skip;
212+
while (HasNext() && rows_to_skip > 0) {
213+
// If the number of rows to skip is more than the number of undecoded values, skip the
214+
// Page.
215+
if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) {
216+
rows_to_skip -= num_buffered_values_ - num_decoded_values_;
217+
num_decoded_values_ = num_buffered_values_;
218+
} else {
219+
// We need to read this Page
220+
// Jump to the right offset in the Page
221+
int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
222+
int64_t values_read = 0;
223+
auto vals = std::make_shared<OwnedMutableBuffer>(
224+
batch_size * type_traits<DType::type_num>::value_byte_size, this->allocator_);
225+
auto def_levels = std::make_shared<OwnedMutableBuffer>(
226+
batch_size * sizeof(int16_t), this->allocator_);
227+
auto rep_levels = std::make_shared<OwnedMutableBuffer>(
228+
batch_size * sizeof(int16_t), this->allocator_);
229+
do {
230+
batch_size = std::min(batch_size, rows_to_skip);
231+
values_read =
232+
ReadBatch(batch_size, reinterpret_cast<int16_t*>(def_levels->mutable_data()),
233+
reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
234+
reinterpret_cast<T*>(vals->mutable_data()), &values_read);
235+
rows_to_skip -= values_read;
236+
} while (values_read > 0 && rows_to_skip > 0);
237+
}
238+
}
239+
return num_rows_to_skip - rows_to_skip;
240+
}
241+
204242
typedef TypedColumnReader<BooleanType> BoolReader;
205243
typedef TypedColumnReader<Int32Type> Int32Reader;
206244
typedef TypedColumnReader<Int64Type> Int64Reader;

cpp/src/parquet/file/file-metadata-test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ TEST(Metadata, TestBuildAccess) {
8383

8484
// file metadata
8585
ASSERT_EQ(nrows, f_accessor->num_rows());
86+
ASSERT_LE(0, f_accessor->size());
8687
ASSERT_EQ(2, f_accessor->num_row_groups());
8788
ASSERT_EQ(DEFAULT_WRITER_VERSION, f_accessor->version());
8889
ASSERT_EQ(DEFAULT_CREATED_BY, f_accessor->created_by());

cpp/src/parquet/file/metadata.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,15 +229,18 @@ std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const
229229
// file metadata
230230
class FileMetaData::FileMetaDataImpl {
231231
public:
232-
FileMetaDataImpl() {}
232+
FileMetaDataImpl() : metadata_len_(0) {}
233233

234-
explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) {
234+
explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len)
235+
: metadata_len_(0) {
235236
metadata_.reset(new format::FileMetaData);
236237
DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
238+
metadata_len_ = *metadata_len;
237239
InitSchema();
238240
}
239241
~FileMetaDataImpl() {}
240242

243+
inline uint32_t size() const { return metadata_len_; }
241244
inline int num_columns() const { return schema_.num_columns(); }
242245
inline int64_t num_rows() const { return metadata_->num_rows; }
243246
inline int num_row_groups() const { return metadata_->row_groups.size(); }
@@ -262,6 +265,7 @@ class FileMetaData::FileMetaDataImpl {
262265

263266
private:
264267
friend FileMetaDataBuilder;
268+
uint32_t metadata_len_;
265269
std::unique_ptr<format::FileMetaData> metadata_;
266270
void InitSchema() {
267271
schema::FlatSchemaConverter converter(
@@ -289,6 +293,10 @@ std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const {
289293
return impl_->RowGroup(i);
290294
}
291295

296+
uint32_t FileMetaData::size() const {
297+
return impl_->size();
298+
}
299+
292300
int FileMetaData::num_columns() const {
293301
return impl_->num_columns();
294302
}

cpp/src/parquet/file/metadata.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class PARQUET_EXPORT FileMetaData {
106106
~FileMetaData();
107107

108108
// file metadata
109+
uint32_t size() const;
109110
int num_columns() const;
110111
int64_t num_rows() const;
111112
int num_row_groups() const;

0 commit comments

Comments
 (0)