Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: support spill for fine grained aggregation #7220

Merged
merged 36 commits into from
Apr 20, 2023

Conversation

SeaRise
Copy link
Contributor

@SeaRise SeaRise commented Apr 5, 2023

What problem does this PR solve?

Issue Number: ref #6518

Problem Summary:

What is changed and how it works?

wait fot #7195

  • split Aggregator.executeOnBlock to Aggregator.executeOnBlock and Aggregator.spill in order to support cpu/io thread pool in pipeline model.
  • remove useless param enable_memory_tracker in ExecutorTestUtils, aggregator does not depend on memory tracker now.
  • add LocalAggregateRestorer to support restoring disk data for local agg.
/**
 * ┌──────────────────────────────────────────────────┐
 * │  {bucket0, bucket1, ... bucket256}spilled_file0──┼────┐
 * │  {bucket0, bucket1, ... bucket256}spilled_file1──┼────┤
 * │  {bucket0, bucket1, ... bucket256}spilled_file2──┼────┤
 * │  ...                                             │    │
 * │  {bucket0, bucket1, ... bucket256}spilled_filen──┼────┤
 * └──────────────────────────────────────────────────┘    │
 *                                                         │ loadBucketData
 *                   bucket_data◄──────────────────────────┘
 *                      │
 *                      │ tryPop
 *                      ▼
 *                restored_blocks
 */
  • support spill/restore in LocalAggregateTransformOp: add spill/final_spill/restore phase.
  • add unit test for fine grained shuffle agg in gtest_spill_aggregation.cpp.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

None

@ti-chi-bot
Copy link
Member

ti-chi-bot commented Apr 5, 2023

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • hongyunyan
  • ywqzzy

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added do-not-merge/needs-linked-issue release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Apr 5, 2023
@SeaRise SeaRise changed the title Pipeline: support spill for non fine grained aggregate WIP: Pipeline: support spill for non fine grained aggregate Apr 5, 2023
@ti-chi-bot ti-chi-bot added do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. and removed do-not-merge/needs-linked-issue labels Apr 5, 2023
@SeaRise SeaRise mentioned this pull request Apr 5, 2023
25 tasks
@SeaRise
Copy link
Contributor Author

SeaRise commented Apr 5, 2023

/run-all-tests

@SeaRise
Copy link
Contributor Author

SeaRise commented Apr 5, 2023

/run-all-tests

@SeaRise
Copy link
Contributor Author

SeaRise commented Apr 19, 2023

/run-unit-test

Copy link
Contributor

@ywqzzy ywqzzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

dbms/src/Operators/LocalAggregateRestorer.cpp Outdated Show resolved Hide resolved
assert(status.load() == AggStatus::build);
aggregator->finishSpill();
LOG_INFO(log, "Begin restore data from disk for local aggregation.");
auto input_streams = aggregator->restoreSpilledData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If input_stream size == 0, we can skip creating LocalAggregateRestorer, add the operator will not switch to io pool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!
Since aggregator.hasSpilledData is true, input_streams must not be empty.
RUNTIME_CHECK(!input_streams.empty) added.

{
for (const auto & bucket_stream : bucket_streams)
bucket_inputs.emplace_back(bucket_stream);
if (bucket_inputs.empty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is is possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed and added assert(!bucket_inputs.empty());

@ti-chi-bot ti-chi-bot added the status/LGT1 Indicates that a PR has LGTM 1. label Apr 19, 2023
return OperatorStatus::HAS_OUTPUT;
return agg_context.hasSpilledData()
? fromBuildToFinalSpillOrRestore()
: fromBuildToConvergent(block);
}
agg_context.buildOnBlock(task_index, block);
block.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to clear the block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to release the memory earlier, the block written to the sink will not be used anymore

Comment on lines -33 to -42
if (unlikely(aggregate_context->useNullSource()))
{
group_builder.init(1);
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<NullSourceOp>(
exec_status,
aggregate_context->getHeader(),
log->identifier()));
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to delete these codes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of AggContext::useNullSource is wrapped into AggContext::getConvergentConcurrency and AggContext::readForConvergent, just to simplify the code.

Copy link
Contributor

@xzhangxian1008 xzhangxian1008 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ti-chi-bot
Copy link
Member

@xzhangxian1008: Thanks for your review. The bot only counts approvals from reviewers and higher roles in list, but you're still welcome to leave your comments.

In response to this:

LGTM

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot ti-chi-bot added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Apr 20, 2023
@SeaRise
Copy link
Contributor Author

SeaRise commented Apr 20, 2023

/merge

@ti-chi-bot
Copy link
Member

@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests:

/run-all-tests

You only need to trigger /merge once, and if the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes.

If you have any questions about the PR merge process, please refer to pr process.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: 3d49d23

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Apr 20, 2023
@ti-chi-bot ti-chi-bot removed the status/can-merge Indicates a PR has been approved by a committer. label Apr 20, 2023
@SeaRise
Copy link
Contributor Author

SeaRise commented Apr 20, 2023

/merge

@ti-chi-bot
Copy link
Member

@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests:

/run-all-tests

You only need to trigger /merge once, and if the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes.

If you have any questions about the PR merge process, please refer to pr process.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: 7b0b566

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Apr 20, 2023
@ti-chi-bot ti-chi-bot merged commit 3ef3154 into pingcap:master Apr 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants