Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ void Writer::write(const RowVectorPtr& data) {
recordBatch->schema(), recordBatch->columns(), data->size());
if (!arrowWriter_) {
stream_ = std::make_shared<DataBufferSink>(
pool_, queryCtx_->queryConfig().dataBufferGrowRatio());
finalSink_.get(),
pool_,
queryCtx_->queryConfig().dataBufferGrowRatio());
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowWriter_,
Expand All @@ -45,26 +47,26 @@ void Writer::write(const RowVectorPtr& data) {
}

PARQUET_THROW_NOT_OK(arrowWriter_->WriteTable(*table, 10000));

if (queryCtx_->queryConfig().dataBufferGrowRatio() > 1) {
finalSink_->write(std::move(stream_->dataBuffer()));
flush(); // No performance drop on 1TB dataset.
}
}

void Writer::flush() {
if (arrowWriter_) {
PARQUET_THROW_NOT_OK(arrowWriter_->Close());
arrowWriter_.reset();
finalSink_->write(std::move(stream_->dataBuffer()));
}
PARQUET_THROW_NOT_OK(stream_->Flush());
}

void Writer::newRowGroup(int32_t numRows) {
PARQUET_THROW_NOT_OK(arrowWriter_->NewRowGroup(numRows));
}

void Writer::close() {
flush();
finalSink_->close();
if (arrowWriter_) {
PARQUET_THROW_NOT_OK(arrowWriter_->Close());
arrowWriter_.reset();
}
PARQUET_THROW_NOT_OK(stream_->Close());
}

} // namespace facebook::velox::parquet
17 changes: 13 additions & 4 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ namespace facebook::velox::parquet {
// Utility for capturing Arrow output into a DataBuffer.
class DataBufferSink : public arrow::io::OutputStream {
public:
explicit DataBufferSink(memory::MemoryPool& pool, uint32_t growRatio = 1)
: buffer_(pool), growRatio_(growRatio) {}
explicit DataBufferSink(
dwio::common::DataSink* sink,
memory::MemoryPool& pool,
uint32_t growRatio = 1)
: sink_(sink), buffer_(pool), growRatio_(growRatio) {}

arrow::Status Write(const std::shared_ptr<arrow::Buffer>& data) override {
buffer_.append(
Expand All @@ -53,28 +56,34 @@ class DataBufferSink : public arrow::io::OutputStream {
}

arrow::Status Flush() override {
bytesFlushed_ += buffer_.size();
sink_->write(std::move(buffer_));
return arrow::Status::OK();
}

arrow::Result<int64_t> Tell() const override {
return buffer_.size();
return bytesFlushed_ + buffer_.size();
}

arrow::Status Close() override {
ARROW_RETURN_NOT_OK(Flush());
sink_->close();
return arrow::Status::OK();
}

bool closed() const override {
return false;
return sink_->isClosed();
}

dwio::common::DataBuffer<char>& dataBuffer() {
return buffer_;
}

private:
dwio::common::DataSink* sink_;
dwio::common::DataBuffer<char> buffer_;
uint32_t growRatio_ = 1;
int64_t bytesFlushed_ = 0;
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down