Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix memory limit exceeded problem when writing a partitioned Parquet table #43672

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

mxdzs0612
Copy link
Contributor

@mxdzs0612 mxdzs0612 commented Apr 7, 2024

Why I'm doing:

We may encounter the following problem when writing a Parquet table with lots of partitions by connector sink. (e.g. partitioned by dayofyear(xxxx))

Used: 23943372128, Limit: 23942823063. Mem usage has exceed the limit of query pool

This PR fixs the problem by flushing row groups when memory usage of a BE almost reaches the threshold, which is 80% of the total memory by default.

What I'm doing:

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

@mxdzs0612 mxdzs0612 requested review from a team as code owners April 7, 2024 12:41
@github-actions github-actions bot added the 3.3 label Apr 7, 2024
@mxdzs0612 mxdzs0612 changed the title [BugFix] Fix memory limit exceeded problem when writing a partitioned Parquet file [BugFix] Fix memory limit exceeded problem when writing a partitioned Parquet table Apr 8, 2024
@mxdzs0612 mxdzs0612 force-pushed the sink_flush branch 2 times, most recently from bfef844 to 473ad00 Compare April 8, 2024 12:24
@DorianZheng
Copy link
Contributor

DorianZheng commented Apr 10, 2024

@mxdzs0612 I think we should try to merge small files before committing the final results for further read performance.

@@ -48,7 +49,13 @@ std::future<Status> ParquetFileWriter::write(ChunkPtr chunk) {
if (auto status = _rowgroup_writer->write(chunk.get()); !status.ok()) {
return make_ready_future(std::move(status));
}
if (_rowgroup_writer->estimated_buffered_bytes() >= _writer_options->rowgroup_size) {
double mem_usage = 0.0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to implement this in ConnectorSinkOperator so all file formats can benefit from this

@silverbullet233
Copy link
Contributor

silverbullet233 commented Apr 10, 2024

It is not a good choice to sense the memory state within the operator and this should be decided before scheduling. Operator has an interface set_execute_mode. We can use this interface to control the subsequent processing strategy. You can refer to the method of local_exchange_source_operator to get more details.

@silverbullet233
Copy link
Contributor

silverbullet233 commented Apr 10, 2024

It is not a good choice to sense the memory state within the operator and this should be decided before scheduling. Operator has an interface set_execute_mode. We can use this interface to control the subsequent processing strategy. You can refer to the method of local_exchange_source_operator to get more details.

btw, seems that what you want to do is similar to #25053, maybe you can refer it to get more context

Signed-off-by: Jiao Mingye <mxdzs0612@gmail.com>
Signed-off-by: Jiao Mingye <mxdzs0612@gmail.com>
Copy link

[FE Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

[BE Incremental Coverage Report]

pass : 6 / 6 (100.00%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 be/src/formats/parquet/parquet_file_writer.cpp 6 6 100.00% []

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants