Skip to content

Commit

Permalink
fix mpp test
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy committed Mar 3, 2023
1 parent 6c6b966 commit 40a9d8b
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 18 deletions.
2 changes: 0 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
void PhysicalAggregation::buildPipeline(PipelineBuilder & builder)
{
auto aggregate_context = std::make_shared<AggregateContext>(
is_final_agg,
log->identifier());
// TODO support fine grained shuffle.
assert(!fine_grained_shuffle.enable());
Expand All @@ -178,7 +177,6 @@ void PhysicalAggregation::buildPipeline(PipelineBuilder & builder)
aggregation_collators,
is_final_agg,
aggregate_descriptions,
expr_after_agg,
aggregate_context);
// Break the pipeline for agg_build.
auto agg_build_builder = builder.breakPipeline(agg_build);
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ class PhysicalAggregationBuild : public PhysicalUnary
const TiDB::TiDBCollators & aggregation_collators_,
bool is_final_agg_,
const AggregateDescriptions & aggregate_descriptions_,
const ExpressionActionsPtr & expr_after_agg_,
const AggregateContextPtr & aggregate_context_)
: PhysicalUnary(executor_id_, PlanType::AggregationBuild, schema_, req_id, child_)
, before_agg_actions(before_agg_actions_)
, aggregation_keys(aggregation_keys_)
, aggregation_collators(aggregation_collators_)
, is_final_agg(is_final_agg_)
, aggregate_descriptions(aggregate_descriptions_)
, expr_after_agg(expr_after_agg_)
, aggregate_context(aggregate_context_)
{}

Expand All @@ -60,7 +58,6 @@ class PhysicalAggregationBuild : public PhysicalUnary
TiDB::TiDBCollators aggregation_collators;
bool is_final_agg;
AggregateDescriptions aggregate_descriptions;
ExpressionActionsPtr expr_after_agg;
AggregateContextPtr aggregate_context;
};
} // namespace DB
25 changes: 17 additions & 8 deletions dbms/src/Operators/AggregateContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,27 @@ void AggregateContext::executeOnBlock(size_t task_index, const Block & block)
threads_data[task_index].src_rows += block.rows();
}

void AggregateContext::initConvergent()
void AggregateContext::writeSuffix()
{
RUNTIME_CHECK(inited_build && !inited_convergent);
size_t total_src_rows = 0;
size_t total_src_bytes = 0;
size_t rows = 0;
for (size_t i = 0; i < max_threads; ++i)
{
rows += many_data[i]->size();
size_t rows = many_data[i]->size();
LOG_TRACE(
log,
"Aggregated. {} to {} rows (from {:.3f} MiB))",
threads_data[i].src_rows,
rows,
(threads_data[i].src_bytes / 1048576.0));
total_src_rows += threads_data[i].src_rows;
total_src_bytes += threads_data[i].src_bytes;
}

LOG_TRACE(
log,
"Total aggregated {} rows to {} rows (from {:.3f} MiB))",
"Total aggregated {} rows (from {:.3f} MiB))",
total_src_rows,
rows,
(total_src_bytes / 1048576.0));

if (total_src_rows == 0 && keys_size == 0 && !empty_result_for_aggregation_by_empty_set)
Expand All @@ -69,8 +73,13 @@ void AggregateContext::initConvergent()
*many_data[0],
threads_data[0].key_columns,
threads_data[0].aggregate_columns);
}

void AggregateContext::initConvergent()
{
RUNTIME_CHECK(inited_build && !inited_convergent);

merging_buckets = aggregator->mergeAndConvertToBlocks(many_data, is_final, max_threads);
merging_buckets = aggregator->mergeAndConvertToBlocks(many_data, true, max_threads);
inited_convergent = true;
RUNTIME_CHECK(!merging_buckets || merging_buckets->getConcurrency() > 0);
}
Expand All @@ -85,7 +94,7 @@ size_t AggregateContext::getConvergentConcurrency()
Block AggregateContext::getHeader() const
{
RUNTIME_CHECK(inited_build);
return aggregator->getHeader(is_final);
return aggregator->getHeader(true);
}

bool AggregateContext::isTwoLevel()
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Operators/AggregateContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ struct ThreadData
class AggregateContext
{
public:
AggregateContext(
bool final_,
explicit AggregateContext(
const String & req_id)
: is_final(final_)
, log(Logger::get(req_id))
: log(Logger::get(req_id))
{
}

Expand All @@ -52,6 +50,8 @@ class AggregateContext

void initConvergent();

void writeSuffix();

size_t getConvergentConcurrency();

Block readForConvergent(size_t index);
Expand All @@ -75,7 +75,6 @@ class AggregateContext
ManyAggregatedDataVariants many_data;
std::vector<ThreadData> threads_data;
size_t max_threads{};
bool is_final{};

const LoggerPtr log;
};
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Operators/AggregateConvergentSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ namespace DB
OperatorStatus AggregateConvergentSourceOp::readImpl(Block & block)
{
block = agg_context->readForConvergent(index);
total_rows += block.rows();
return OperatorStatus::HAS_OUTPUT;
}

void AggregateConvergentSourceOp::operateSuffix()
{
LOG_INFO(log, "finish read {} rows from aggregate context", total_rows);
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Operators/AggregateConvergentSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ class AggregateConvergentSourceOp : public SourceOp
return "AggregateConvergentSourceOp";
}

void operateSuffix() override;

protected:
OperatorStatus readImpl(Block & block) override;

private:
AggregateContextPtr agg_context;
uint64_t total_rows{};
const size_t index;
};
} // namespace DB
8 changes: 8 additions & 0 deletions dbms/src/Operators/AggregateSinkOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ OperatorStatus AggregateSinkOp::writeImpl(Block && block)
return OperatorStatus::FINISHED;
}
agg_context->executeOnBlock(index, block);
total_rows += block.rows();
block.clear();
return OperatorStatus::NEED_INPUT;
}

void AggregateSinkOp::operateSuffix()
{
LOG_DEBUG(log, "finish write with {} rows", total_rows);

agg_context->writeSuffix();
}

} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Operators/AggregateSinkOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ class AggregateSinkOp : public SinkOp
return "AggregateSinkOp";
}

void operateSuffix() override;

protected:
OperatorStatus writeImpl(Block && block) override;

private:
size_t index{};
uint64_t total_rows{};
AggregateContextPtr agg_context;
};
} // namespace DB

0 comments on commit 40a9d8b

Please sign in to comment.