-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline: support spill for fine grained aggregation #7220
Pipeline: support spill for fine grained aggregation #7220
Conversation
Co-authored-by: yanweiqi <592838129@qq.com>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
/run-all-tests |
/run-unit-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
assert(status.load() == AggStatus::build); | ||
aggregator->finishSpill(); | ||
LOG_INFO(log, "Begin restore data from disk for local aggregation."); | ||
auto input_streams = aggregator->restoreSpilledData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If input_stream size == 0, we can skip creating LocalAggregateRestorer, add the operator will not switch to io pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point!
Since aggregator.hasSpilledData
is true, input_streams
must not be empty.
RUNTIME_CHECK(!input_streams.empty)
added.
{ | ||
for (const auto & bucket_stream : bucket_streams) | ||
bucket_inputs.emplace_back(bucket_stream); | ||
if (bucket_inputs.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is is possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed and added assert(!bucket_inputs.empty());
return OperatorStatus::HAS_OUTPUT; | ||
return agg_context.hasSpilledData() | ||
? fromBuildToFinalSpillOrRestore() | ||
: fromBuildToConvergent(block); | ||
} | ||
agg_context.buildOnBlock(task_index, block); | ||
block.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to clear the block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to release the memory earlier, the block written to the sink will not be used anymore
if (unlikely(aggregate_context->useNullSource())) | ||
{ | ||
group_builder.init(1); | ||
group_builder.transform([&](auto & builder) { | ||
builder.setSourceOp(std::make_unique<NullSourceOp>( | ||
exec_status, | ||
aggregate_context->getHeader(), | ||
log->identifier())); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to delete these codes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic of AggContext::useNullSource
is wrapped into AggContext::getConvergentConcurrency
and AggContext::readForConvergent
, just to simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@xzhangxian1008: Thanks for your review. The bot only counts approvals from reviewers and higher roles in list, but you're still welcome to leave your comments. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
/merge |
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 3d49d23
|
/merge |
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 7b0b566
|
What problem does this PR solve?
Issue Number: ref #6518
Problem Summary:
What is changed and how it works?
wait fot #7195
Aggregator.executeOnBlock
toAggregator.executeOnBlock
andAggregator.spill
in order to support cpu/io thread pool in pipeline model.enable_memory_tracker
inExecutorTestUtils
, aggregator does not depend on memory tracker now.LocalAggregateRestorer
to support restoring disk data for local agg.LocalAggregateTransformOp
: add spill/final_spill/restore phase.gtest_spill_aggregation.cpp
.Check List
Tests
Side effects
Documentation
Release note