Skip to content

Commit

Permalink
PARQUET-711: Use metadata builders in parquet writer
Browse files Browse the repository at this point in the history
I wrote a sample file and the metadata seems to be correct.
@xhochy I fixed some missing metadata like `dictionary_page_offset`. You might want to check if this fixes the Drill problem.

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes apache#156 from majetideepak/PARQUET-711 and squashes the following commits:

25f5a7e [Deepak Majeti] fix schema and descr. Resolves PARQUET-705 and PARQUET-707
8b4784d [Deepak Majeti] Review comments to add methods back
fdbc761 [Deepak Majeti] fix clang error and comments
c6cb071 [Deepak Majeti] convert DCHECKS to Exceptions in metadata
ada3ac2 [Deepak Majeti] clang format
d9c9131 [Deepak Majeti] Use metadata builders in parquet writer
  • Loading branch information
Deepak Majeti authored and wesm committed Sep 2, 2018
1 parent 56469ae commit 028a289
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 184 deletions.
22 changes: 13 additions & 9 deletions cpp/src/parquet/column/column-io-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

namespace parquet {

using format::ColumnChunk;
using schema::PrimitiveNode;

namespace benchmark {

std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
ColumnChunk* metadata, ColumnDescriptor* schema, const WriterProperties* properties) {
ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema,
const WriterProperties* properties) {
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
return std::unique_ptr<Int64Writer>(new Int64Writer(
Expand All @@ -57,17 +57,19 @@ void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {

template <Repetition::type repetition>
static void BM_WriteInt64Column(::benchmark::State& state) {
format::ColumnChunk metadata;
format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range_x(), 128);
std::vector<int16_t> definition_levels(state.range_x(), 1);
std::vector<int16_t> repetition_levels(state.range_x(), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
std::shared_ptr<WriterProperties> properties = default_writer_properties();
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));

while (state.KeepRunning()) {
InMemoryOutputStream dst;
std::unique_ptr<Int64Writer> writer =
BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
std::unique_ptr<Int64Writer> writer = BuildWriter(
state.range_x(), &dst, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(
values.size(), definition_levels.data(), repetition_levels.data(), values.data());
writer->Close();
Expand All @@ -91,16 +93,18 @@ std::unique_ptr<Int64Reader> BuildReader(

template <Repetition::type repetition>
static void BM_ReadInt64Column(::benchmark::State& state) {
format::ColumnChunk metadata;
format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range_x(), 128);
std::vector<int16_t> definition_levels(state.range_x(), 1);
std::vector<int16_t> repetition_levels(state.range_x(), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
std::shared_ptr<WriterProperties> properties = default_writer_properties();
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));

InMemoryOutputStream dst;
std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
std::unique_ptr<Int64Writer> writer =
BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
BuildWriter(state.range_x(), &dst, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(
values.size(), definition_levels.data(), repetition_levels.data(), values.data());
writer->Close();
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class TestPrimitiveWriter : public ::testing::Test {
repetition_levels_out_.resize(SMALL_SIZE);

SetUpSchemaRequired();
metadata_accessor_ =
ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
}

void BuildReader() {
Expand All @@ -87,8 +89,10 @@ class TestPrimitiveWriter : public ::testing::Test {
std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
sink_.reset(new InMemoryOutputStream());
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
metadata_ = ColumnChunkMetaDataBuilder::Make(
writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_));
std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter(
sink_.get(), Compression::UNCOMPRESSED, metadata_.get()));
WriterProperties::Builder wp_builder;
if (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY) {
wp_builder.enable_dictionary();
Expand Down Expand Up @@ -126,9 +130,7 @@ class TestPrimitiveWriter : public ::testing::Test {
ASSERT_EQ(this->values_, this->values_out_);
}

int64_t metadata_num_values() const {
return metadata_.meta_data.num_values;
}
int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }

protected:
int64_t values_read_;
Expand All @@ -152,7 +154,9 @@ class TestPrimitiveWriter : public ::testing::Test {

private:
NodePtr node_;
format::ColumnChunk metadata_;
format::ColumnChunk thrift_metadata_;
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
std::shared_ptr<ColumnDescriptor> schema_;
std::unique_ptr<InMemoryOutputStream> sink_;
std::shared_ptr<WriterProperties> writer_properties_;
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/parquet/file/file-metadata-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ TEST(Metadata, TestBuildAccess) {
stats_float.max = &float_max;

auto f_builder = FileMetaDataBuilder::Make(&schema, props);
auto rg1_builder = f_builder->AppendRowGroup();
auto rg2_builder = f_builder->AppendRowGroup();
auto rg1_builder = f_builder->AppendRowGroup(nrows / 2);
auto rg2_builder = f_builder->AppendRowGroup(nrows / 2);

// Write the metadata
// rowgroup1 metadata
Expand All @@ -66,7 +66,7 @@ TEST(Metadata, TestBuildAccess) {
col2_builder->SetStatistics(stats_float);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false);
rg1_builder->Finish(nrows / 2);
rg1_builder->Finish(1024);

// rowgroup2 metadata
col1_builder = rg2_builder->NextColumnChunk();
Expand All @@ -76,7 +76,7 @@ TEST(Metadata, TestBuildAccess) {
col2_builder->SetStatistics(stats_float);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false);
rg2_builder->Finish(nrows / 2);
rg2_builder->Finish(1024);

// Read the metadata
auto f_accessor = f_builder->Finish();
Expand Down
96 changes: 63 additions & 33 deletions cpp/src/parquet/file/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
inline const SchemaDescriptor* schema() const { return schema_; }

std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
DCHECK(i < num_columns()) << "The file only has " << num_columns()
<< " columns, requested metadata for column: " << i;
if (!(i < num_columns())) {
std::stringstream ss;
ss << "The file only has " << num_columns()
<< " columns, requested metadata for column: " << i;
throw ParquetException(ss.str());
}
return ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&row_group_->columns[i]));
}
Expand Down Expand Up @@ -244,14 +248,17 @@ class FileMetaData::FileMetaDataImpl {
void WriteTo(OutputStream* dst) { SerializeThriftMsg(metadata_.get(), 1024, dst); }

std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
DCHECK(i < num_row_groups())
<< "The file only has " << num_row_groups()
<< " row groups, requested metadata for row group: " << i;
if (!(i < num_row_groups())) {
std::stringstream ss;
ss << "The file only has " << num_row_groups()
<< " row groups, requested metadata for row group: " << i;
throw ParquetException(ss.str());
}
return RowGroupMetaData::Make(
reinterpret_cast<const uint8_t*>(&metadata_->row_groups[i]), &schema_);
}

const SchemaDescriptor* schema_descriptor() const { return &schema_; }
const SchemaDescriptor* schema() const { return &schema_; }

private:
friend FileMetaDataBuilder;
Expand Down Expand Up @@ -306,8 +313,8 @@ int FileMetaData::num_schema_elements() const {
return impl_->num_schema_elements();
}

const SchemaDescriptor* FileMetaData::schema_descriptor() const {
return impl_->schema_descriptor();
const SchemaDescriptor* FileMetaData::schema() const {
return impl_->schema();
}

void FileMetaData::WriteTo(OutputStream* dst) {
Expand Down Expand Up @@ -374,6 +381,8 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(thrift_encodings);
}

const ColumnDescriptor* descr() const { return column_; }

private:
format::ColumnChunk* column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
Expand Down Expand Up @@ -406,24 +415,33 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
compressed_size, uncompressed_size, dictionary_fallback);
}

const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
return impl_->descr();
}

void ColumnChunkMetaDataBuilder::SetStatistics(const ColumnStatistics& result) {
impl_->SetStatistics(result);
}

class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
const SchemaDescriptor* schema, uint8_t* contents)
explicit RowGroupMetaDataBuilderImpl(int64_t num_rows,
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema,
uint8_t* contents)
: properties_(props), schema_(schema), current_column_(0) {
row_group_ = reinterpret_cast<format::RowGroup*>(contents);
InitializeColumns(schema->num_columns());
row_group_->__set_num_rows(num_rows);
}
~RowGroupMetaDataBuilderImpl() {}

ColumnChunkMetaDataBuilder* NextColumnChunk() {
DCHECK(current_column_ < num_columns())
<< "The schema only has " << num_columns()
<< " columns, requested metadata for column: " << current_column_;
if (!(current_column_ < num_columns())) {
std::stringstream ss;
ss << "The schema only has " << num_columns()
<< " columns, requested metadata for column: " << current_column_;
throw ParquetException(ss.str());
}
auto column = schema_->Column(current_column_);
auto column_builder = ColumnChunkMetaDataBuilder::Make(properties_, column,
reinterpret_cast<uint8_t*>(&row_group_->columns[current_column_++]));
Expand All @@ -432,25 +450,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
return column_builder_ptr;
}

void Finish(int64_t num_rows) {
DCHECK(current_column_ == schema_->num_columns())
<< "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
<< " columns are initialized";
size_t total_byte_size = 0;
void Finish(int64_t total_bytes_written) {
if (!(current_column_ == schema_->num_columns())) {
std::stringstream ss;
ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
<< " columns are initialized";
throw ParquetException(ss.str());
}
int64_t total_byte_size = 0;

for (int i = 0; i < schema_->num_columns(); i++) {
DCHECK(row_group_->columns[i].file_offset > 0) << "Column " << i
<< " is not complete.";
if (!(row_group_->columns[i].file_offset > 0)) {
std::stringstream ss;
ss << "Column " << i << " is not complete.";
throw ParquetException(ss.str());
}
total_byte_size += row_group_->columns[i].meta_data.total_compressed_size;
}
DCHECK(total_bytes_written == total_byte_size)
<< "Total bytes in this RowGroup does not match with compressed sizes of columns";

row_group_->__set_total_byte_size(total_byte_size);
row_group_->__set_num_rows(num_rows);
}

private:
int num_columns() { return row_group_->columns.size(); }

private:
void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }

format::RowGroup* row_group_;
Expand All @@ -460,30 +485,35 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
int current_column_;
};

std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(int64_t num_rows,
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
uint8_t* contents) {
return std::unique_ptr<RowGroupMetaDataBuilder>(
new RowGroupMetaDataBuilder(props, schema_, contents));
new RowGroupMetaDataBuilder(num_rows, props, schema_, contents));
}

RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(int64_t num_rows,
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
uint8_t* contents)
: impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
new RowGroupMetaDataBuilderImpl(num_rows, props, schema_, contents))} {}

RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {}

ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() {
return impl_->NextColumnChunk();
}

void RowGroupMetaDataBuilder::Finish(int64_t num_rows) {
impl_->Finish(num_rows);
int RowGroupMetaDataBuilder::num_columns() {
return impl_->num_columns();
}

void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) {
impl_->Finish(total_bytes_written);
}

// file metadata
// TODO(PARQUET-595) Support key_value_metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
explicit FileMetaDataBuilderImpl(
Expand All @@ -493,10 +523,10 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
}
~FileMetaDataBuilderImpl() {}

RowGroupMetaDataBuilder* AppendRowGroup() {
RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows) {
auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
auto row_group_builder = RowGroupMetaDataBuilder::Make(
properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
num_rows, properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
row_group_builders_.push_back(std::move(row_group_builder));
row_groups_.push_back(std::move(row_group));
Expand All @@ -517,7 +547,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
metadata_->__set_version(properties_->version());
metadata_->__set_created_by(properties_->created_by());
parquet::schema::SchemaFlattener flattener(
static_cast<parquet::schema::GroupNode*>(schema_->schema().get()),
static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
&metadata_->schema);
flattener.Flatten();
auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData());
Expand Down Expand Up @@ -548,8 +578,8 @@ FileMetaDataBuilder::FileMetaDataBuilder(

FileMetaDataBuilder::~FileMetaDataBuilder() {}

RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
return impl_->AppendRowGroup();
RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup(int64_t num_rows) {
return impl_->AppendRowGroup(num_rows);
}

std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() {
Expand Down
Loading

0 comments on commit 028a289

Please sign in to comment.