Skip to content

Commit

Permalink
Support using FineGrainedShuffle Info for Join&Agg (#6279)
Browse files Browse the repository at this point in the history
ref #6157
  • Loading branch information
yibin87 authored Nov 15, 2022
1 parent 6f7e751 commit e77d01c
Show file tree
Hide file tree
Showing 23 changed files with 313 additions and 112 deletions.
1 change: 1 addition & 0 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const tipb::Exc
: output_index(0)
, max_block_size(max_block_size)
, rows(rows_)
, source_num(static_cast<size_t>(receiver.encoded_task_meta_size()))
{
for (int i = 0; i < receiver.field_types_size(); ++i)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ class MockExchangeReceiverInputStream : public IProfilingBlockInputStream
return Block(columns);
}
String getName() const override { return "MockExchangeReceiver"; }
size_t getSourceNum() const { return source_num; }
ColumnsWithTypeAndName columns;
size_t output_index;
size_t max_block_size;
size_t rows;
size_t source_num = 0;

protected:
Block readImpl() override;
Expand Down
38 changes: 30 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct AnalysisResult
TiDB::TiDBCollators aggregation_collators;
AggregateDescriptions aggregate_descriptions;
bool is_final_agg = false;
bool enable_fine_grained_shuffle_agg = false;
};

AnalysisResult analyzeExpressions(
Expand All @@ -112,6 +113,7 @@ AnalysisResult analyzeExpressions(
if (query_block.aggregation)
{
res.is_final_agg = AggregationInterpreterHelper::isFinalAgg(query_block.aggregation->aggregation());
res.enable_fine_grained_shuffle_agg = enableFineGrainedShuffle(query_block.aggregation->fine_grained_shuffle_stream_count());

std::tie(res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.before_aggregation) = analyzer.appendAggregation(
chain,
Expand Down Expand Up @@ -189,7 +191,7 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,
analyzer = std::move(storage_interpreter.analyzer);
}

void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query)
void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query, size_t fine_grained_shuffle_count)
{
if (unlikely(input_streams_vec.size() != 2))
{
Expand Down Expand Up @@ -253,6 +255,8 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
tiflash_join.kind,
tiflash_join.strictness,
log->identifier(),
enableFineGrainedShuffle(fine_grained_shuffle_count),
fine_grained_shuffle_count,
tiflash_join.join_key_collators,
probe_filter_column_name,
build_filter_column_name,
Expand All @@ -273,11 +277,13 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
// add a HashJoinBuildBlockInputStream to build a shared hash table
auto build_streams = [&](BlockInputStreams & streams) {
size_t build_index = 0;
auto extra_info = fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id);
if (enableFineGrainedShuffle(fine_grained_shuffle_count))
extra_info = fmt::format("{} {}", extra_info, String(enableFineGrainedShuffleExtraInfo));
for (auto & stream : streams)
{
stream = std::make_shared<HashJoinBuildBlockInputStream>(stream, join_ptr, build_index++, log->identifier());
stream->setExtraInfo(
fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id));
stream->setExtraInfo(extra_info);
join_execute_info.join_build_streams.push_back(stream);
}
};
Expand Down Expand Up @@ -381,7 +387,8 @@ void DAGQueryBlockInterpreter::executeAggregation(
const Names & key_names,
const TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
bool is_final_agg)
bool is_final_agg,
bool enable_fine_grained_shuffle)
{
executeExpression(pipeline, expression_actions_ptr, log, "before aggregation");

Expand All @@ -397,9 +404,24 @@ void DAGQueryBlockInterpreter::executeAggregation(
aggregate_descriptions,
is_final_agg);

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1)
if (enable_fine_grained_shuffle)
{
/// Go straight forward without merging phase when enable_fine_grained_shuffle
RUNTIME_CHECK(pipeline.streams_with_non_joined_data.empty());
pipeline.transform([&](auto & stream) {
stream = std::make_shared<AggregatingBlockInputStream>(
stream,
params,
context.getFileProvider(),
true,
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
});
recordProfileStreams(pipeline, query_block.aggregation_name);
}
else if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1)
{
/// If there are several sources, then we perform parallel aggregation
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
Expand Down Expand Up @@ -589,7 +611,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
if (query_block.source->tp() == tipb::ExecType::TypeJoin)
{
SubqueryForSet right_query;
handleJoin(query_block.source->join(), pipeline, right_query);
handleJoin(query_block.source->join(), pipeline, right_query, query_block.source->fine_grained_shuffle_stream_count());
recordProfileStreams(pipeline, query_block.source_name);
dagContext().addSubquery(query_block.source_name, std::move(right_query));
}
Expand Down Expand Up @@ -661,7 +683,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
if (res.before_aggregation)
{
// execute aggregation
executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.is_final_agg);
executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.is_final_agg, res.enable_fine_grained_shuffle_agg);
}
if (res.before_having)
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DAGQueryBlockInterpreter
void executeImpl(DAGPipeline & pipeline);
void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query);
void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query, size_t fine_grained_shuffle_count);
void handleExchangeReceiver(DAGPipeline & pipeline);
void handleMockExchangeReceiver(DAGPipeline & pipeline);
void handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection);
Expand All @@ -80,7 +80,8 @@ class DAGQueryBlockInterpreter
const Names & key_names,
const TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
bool is_final_agg);
bool is_final_agg,
bool enable_fine_grained_shuffle);
void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols, const String & extra_info = "");
void handleExchangeSender(DAGPipeline & pipeline);
void handleMockExchangeSender(DAGPipeline & pipeline);
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,20 +416,20 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
uint64_t fine_grained_shuffle_stream_count_)
: rpc_context(std::move(rpc_context_))
, source_num(source_num_)
, max_streams(max_streams_)
, enable_fine_grained_shuffle_flag(enableFineGrainedShuffle(fine_grained_shuffle_stream_count_))
, output_stream_count(enable_fine_grained_shuffle_flag ? std::min(max_streams_, fine_grained_shuffle_stream_count_) : max_streams_)
, max_buffer_size(std::max<size_t>(batch_packet_count, std::max(source_num, max_streams_) * 2))
, thread_manager(newThreadManager())
, live_connections(source_num)
, state(ExchangeReceiverState::NORMAL)
, exc_log(Logger::get(req_id, executor_id))
, collected(false)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
try
{
if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count_))
if (enable_fine_grained_shuffle_flag)
{
for (size_t i = 0; i < max_streams_; ++i)
for (size_t i = 0; i < output_stream_count; ++i)
{
msg_channels.push_back(std::make_unique<MPMCQueue<std::shared_ptr<ReceivedMessage>>>(max_buffer_size));
}
Expand Down Expand Up @@ -498,7 +498,7 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
else
{
thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] {
if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count))
if (enable_fine_grained_shuffle_flag)
readLoop<true>(req);
else
readLoop<false>(req);
Expand All @@ -511,7 +511,7 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
if (!async_requests.empty())
{
thread_manager->schedule(true, "RecvReactor", [this, async_requests = std::move(async_requests)] {
if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count))
if (enable_fine_grained_shuffle_flag)
reactor<true>(async_requests);
else
reactor<false>(async_requests);
Expand Down Expand Up @@ -787,7 +787,7 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toDecodeResult(
assert(recv_msg->chunks.empty());
// Fine grained shuffle should only be enabled when sending data to TiFlash node.
// So all data should be encoded into MPPDataPacket.chunks.
RUNTIME_CHECK_MSG(!enableFineGrainedShuffle(fine_grained_shuffle_stream_count), "Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled");
RUNTIME_CHECK_MSG(!enable_fine_grained_shuffle_flag, "Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled");
result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema);
}
else if (!recv_msg->chunks.empty())
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ExchangeReceiverBase
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr);

size_t getSourceNum() const { return source_num; }
uint64_t getFineGrainedShuffleStreamCount() const { return fine_grained_shuffle_stream_count; }
uint64_t getFineGrainedShuffleStreamCount() const { return enable_fine_grained_shuffle_flag ? output_stream_count : 0; }

int computeNewThreadCount() const { return thread_count; }

Expand Down Expand Up @@ -208,7 +208,8 @@ class ExchangeReceiverBase
const tipb::ExchangeReceiver pb_exchange_receiver;
const size_t source_num;
const ::mpp::TaskMeta task_meta;
const size_t max_streams;
const bool enable_fine_grained_shuffle_flag;
const size_t output_stream_count;
const size_t max_buffer_size;

std::shared_ptr<ThreadManager> thread_manager;
Expand All @@ -226,7 +227,6 @@ class ExchangeReceiverBase

bool collected = false;
int thread_count = 0;
uint64_t fine_grained_shuffle_stream_count;
};

class ExchangeReceiver : public ExchangeReceiverBase<GRPCReceiverContext>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::batchWriteFineGrainedShuffle()
while (!blocks.empty())
{
const auto & block = blocks.back();
HashBaseWriterHelper::scatterColumnsInplace(block, num_bucket, collators, partition_key_containers_for_reuse, partition_col_ids, hash, selector, scattered);
HashBaseWriterHelper::scatterColumnsForFineGrainedShuffle(block, partition_col_ids, collators, partition_key_containers_for_reuse, partition_num, fine_grained_shuffle_stream_count, hash, selector, scattered);
blocks.pop_back();
}

Expand Down
121 changes: 87 additions & 34 deletions dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,94 @@ std::vector<MutableColumns> createDestColumns(const Block & sample_block, size_t
return dest_tbl_cols;
}

void fillSelector(size_t rows,
const WeakHash32 & hash,
uint32_t part_num,
IColumn::Selector & selector)
{
// fill selector array with most significant bits of hash values
const auto & hash_data = hash.getData();
selector.resize(rows);
for (size_t i = 0; i < rows; ++i)
{
/// Row from interval [(2^32 / part_num) * i, (2^32 / part_num) * (i + 1)) goes to partition with number i.
selector[i] = hash_data[i]; /// [0, 2^32)
selector[i] *= part_num; /// [0, part_num * 2^32), selector stores 64 bit values.
selector[i] >>= 32u; /// [0, part_num)
}
}

/// For FineGrainedShuffle, the selector algorithm should satisfy the requirement:
// the FineGrainedShuffleStreamIndex can be calculated using hash_data and fine_grained_shuffle_stream_count values, without the presence of part_num.
void fillSelectorForFineGrainedShuffle(size_t rows,
const WeakHash32 & hash,
uint32_t part_num,
uint32_t fine_grained_shuffle_stream_count,
IColumn::Selector & selector)
{
// fill selector array with most significant bits of hash values
const auto & hash_data = hash.getData();
selector.resize(rows);
for (size_t i = 0; i < rows; ++i)
{
/// Row from interval [(2^32 / part_num) * i, (2^32 / part_num) * (i + 1)) goes to partition with number i.
selector[i] = hash_data[i]; /// [0, 2^32)
selector[i] *= part_num; /// [0, part_num * 2^32), selector stores 64 bit values.
selector[i] >>= 32u; /// [0, part_num)
selector[i] = selector[i] * fine_grained_shuffle_stream_count + hash_data[i] % fine_grained_shuffle_stream_count; /// map to [0, part_num * fine_grained_shuffle_stream_count)
}
}

void computeHash(const Block & block,
uint32_t num_bucket,
const std::vector<Int64> & partition_col_ids,
const TiDB::TiDBCollators & collators,
std::vector<String> & partition_key_containers,
const std::vector<Int64> & partition_col_ids,
WeakHash32 & hash,
IColumn::Selector & selector)
WeakHash32 & hash)
{
size_t num_rows = block.rows();
// compute hash values
size_t rows = block.rows();
if unlikely (rows == 0)
return;

hash.getData().resize(rows);
hash.reset(rows);
/// compute hash values
for (size_t i = 0; i < partition_col_ids.size(); ++i)
{
const auto & column = block.getByPosition(partition_col_ids[i]).column;
column->updateWeakHash32(hash, collators[i], partition_key_containers[i]);
}
}

// fill selector array with most significant bits of hash values
const auto & hash_data = hash.getData();
for (size_t i = 0; i < num_rows; ++i)
{
/// Row from interval [(2^32 / num_bucket) * i, (2^32 / num_bucket) * (i + 1)) goes to bucket with number i.
selector[i] = hash_data[i]; /// [0, 2^32)
selector[i] *= num_bucket; /// [0, num_bucket * 2^32), selector stores 64 bit values.
selector[i] >>= 32u; /// [0, num_bucket)
}
void computeHash(size_t rows,
const ColumnRawPtrs & key_columns,
const TiDB::TiDBCollators & collators,
std::vector<String> & partition_key_containers,
WeakHash32 & hash)
{
if unlikely (rows == 0)
return;

hash.getData().resize(rows);
hash.reset(rows);
for (size_t i = 0; i < key_columns.size(); i++)
key_columns[i]->updateWeakHash32(hash, collators[i], partition_key_containers[i]);
}

void scatterColumns(const Block & input_block,
uint32_t bucket_num,
const std::vector<Int64> & partition_col_ids,
const TiDB::TiDBCollators & collators,
std::vector<String> & partition_key_containers,
const std::vector<Int64> & partition_col_ids,
uint32_t bucket_num,
std::vector<std::vector<MutableColumnPtr>> & result_columns)
{
size_t rows = input_block.rows();
WeakHash32 hash(rows);
IColumn::Selector selector(rows);
computeHash(input_block, bucket_num, collators, partition_key_containers, partition_col_ids, hash, selector);
if unlikely (input_block.rows() == 0)
return;

WeakHash32 hash(0);
computeHash(input_block, partition_col_ids, collators, partition_key_containers, hash);

IColumn::Selector selector;
fillSelector(input_block.rows(), hash, bucket_num, selector);

for (size_t col_id = 0; col_id < input_block.columns(); ++col_id)
{
Expand All @@ -101,21 +151,24 @@ DB::TrackedMppDataPacketPtrs createPackets(size_t partition_num)
return tracked_packets;
}

void scatterColumnsInplace(const Block & block,
uint32_t bucket_num,
const TiDB::TiDBCollators & collators,
std::vector<String> & partition_key_containers,
const std::vector<Int64> & partition_col_ids,
WeakHash32 & hash,
IColumn::Selector & selector,
std::vector<IColumn::ScatterColumns> & scattered)
void scatterColumnsForFineGrainedShuffle(const Block & block,
const std::vector<Int64> & partition_col_ids,
const TiDB::TiDBCollators & collators,
std::vector<String> & partition_key_containers,
uint32_t part_num,
uint32_t fine_grained_shuffle_stream_count,
WeakHash32 & hash,
IColumn::Selector & selector,
std::vector<IColumn::ScatterColumns> & scattered)
{
size_t num_rows = block.rows();
if unlikely (block.rows() == 0)
return;

// compute hash values
hash.getData().resize(num_rows);
hash.reset(num_rows);
selector.resize(num_rows);
computeHash(block, bucket_num, collators, partition_key_containers, partition_col_ids, hash, selector);
computeHash(block, partition_col_ids, collators, partition_key_containers, hash);

/// fill selector using computed hash
fillSelectorForFineGrainedShuffle(block.rows(), hash, part_num, fine_grained_shuffle_stream_count, selector);

// partition
for (size_t i = 0; i < block.columns(); ++i)
Expand Down
Loading

0 comments on commit e77d01c

Please sign in to comment.