2121
2222namespace facebook ::velox::parquet {
2323
24+ void Writer::flush () {
25+ if (stagingRows_ > 0 ) {
26+ if (!arrowWriter_) {
27+ stream_ = std::make_shared<DataBufferSink>(
28+ pool_, queryCtx_->queryConfig ().dataBufferGrowRatio ());
29+ auto arrowProperties = ::parquet::ArrowWriterProperties::Builder ().build ();
30+ PARQUET_ASSIGN_OR_THROW (
31+ arrowWriter_,
32+ ::parquet::arrow::FileWriter::Open (
33+ *(schema_.get()),
34+ arrow::default_memory_pool(),
35+ stream_,
36+ properties_,
37+ arrowProperties));
38+ }
39+
40+ auto fields = schema_->fields ();
41+ std::vector<std::shared_ptr<arrow::ChunkedArray>> chunks;
42+ for (int colIdx = 0 ; colIdx < fields.size (); colIdx++) {
43+ auto dataType = fields.at (colIdx)->type ();
44+ auto chunk = arrow::ChunkedArray::Make (std::move (stagingChunks_.at (colIdx)), dataType).ValueOrDie ();
45+ chunks.push_back (chunk);
46+ }
47+ auto table = arrow::Table::Make (schema_, std::move (chunks), stagingRows_);
48+ PARQUET_THROW_NOT_OK (arrowWriter_->WriteTable (*table, maxRowGroupRows_));
49+ if (queryCtx_->queryConfig ().dataBufferGrowRatio () > 1 ) {
50+ finalSink_->write (std::move (stream_->dataBuffer ()));
51+ }
52+ for (auto & chunk : stagingChunks_) {
53+ chunk.clear ();
54+ }
55+ stagingRows_ = 0 ;
56+ stagingBytes_ = 0 ;
57+ }
58+ }
59+
60+ /* *
61+ * This method would cache input `ColumnarBatch` to make the size of row group big.
62+ * It would flush when:
63+ * - the cached numRows bigger than `maxRowGroupRows_`
64+ * - the cached bytes bigger than `maxRowGroupBytes_`
65+ *
66+ * This method assumes each input `ColumnarBatch` have same schema.
67+ */
2468void Writer::write (const RowVectorPtr& data) {
2569 ArrowArray array;
2670 ArrowSchema schema;
2771 exportToArrow (data, array, &pool_);
2872 exportToArrow (data, schema);
2973 PARQUET_ASSIGN_OR_THROW (
3074 auto recordBatch, arrow::ImportRecordBatch (&array, &schema));
31- auto table = arrow::Table::Make (
32- recordBatch->schema (), recordBatch->columns (), data->size ());
33- if (!arrowWriter_) {
34- stream_ = std::make_shared<DataBufferSink>(
35- pool_, queryCtx_->queryConfig ().dataBufferGrowRatio ());
36- auto arrowProperties = ::parquet::ArrowWriterProperties::Builder ().build ();
37- PARQUET_ASSIGN_OR_THROW (
38- arrowWriter_,
39- ::parquet::arrow::FileWriter::Open (
40- *recordBatch->schema (),
41- arrow::default_memory_pool(),
42- stream_,
43- properties_,
44- arrowProperties));
75+ if (!schema_) {
76+ schema_ = recordBatch->schema ();
77+ for (int colIdx = 0 ; colIdx < schema_->num_fields (); colIdx++) {
78+ stagingChunks_.push_back (std::vector<std::shared_ptr<arrow::Array>>());
79+ }
4580 }
4681
47- PARQUET_THROW_NOT_OK (arrowWriter_->WriteTable (*table, 10000 ));
48- if (queryCtx_->queryConfig ().dataBufferGrowRatio() > 1) {
49- finalSink_->write (std::move (stream_->dataBuffer ()));
82+ auto bytes = data->estimateFlatSize ();
83+ auto numRows = data->size ();
84+ if (stagingBytes_ + bytes > maxRowGroupBytes_ || stagingRows_ + numRows > maxRowGroupRows_) {
85+ flush ();
5086 }
51- }
5287
53- void Writer::flush () {
54- if (arrowWriter_) {
55- PARQUET_THROW_NOT_OK (arrowWriter_->Close ());
56- arrowWriter_.reset ();
57- finalSink_->write (std::move (stream_->dataBuffer ()));
88+ for (int colIdx = 0 ; colIdx < recordBatch->num_columns (); colIdx++) {
89+ auto array = recordBatch->column (colIdx);
90+ stagingChunks_.at (colIdx).push_back (array);
5891 }
92+ stagingRows_ += numRows;
93+ stagingBytes_ += bytes;
5994}
6095
6196void Writer::newRowGroup (int32_t numRows) {
@@ -64,7 +99,16 @@ void Writer::newRowGroup(int32_t numRows) {
6499
65100void Writer::close () {
66101 flush ();
102+
103+ if (arrowWriter_) {
104+ PARQUET_THROW_NOT_OK (arrowWriter_->Close ());
105+ arrowWriter_.reset ();
106+ finalSink_->write (std::move (stream_->dataBuffer ()));
107+ }
108+
67109 finalSink_->close ();
110+
111+ stagingChunks_.clear ();
68112}
69113
70114} // namespace facebook::velox::parquet
0 commit comments