diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 92e250ac748..86b9e4fad0f 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -389,7 +389,6 @@ namespace DB F(type_cpu, {{"type", "cpu"}}, ExpBuckets{0.005, 2, 20}), \ F(type_io, {{"type", "io"}}, ExpBuckets{0.005, 2, 20})) \ M(tiflash_pipeline_task_change_to_status, "pipeline task change to status", Counter, \ - F(type_to_init, {"type", "to_init"}), \ F(type_to_waiting, {"type", "to_waiting"}), \ F(type_to_running, {"type", "to_running"}), \ F(type_to_io, {"type", "to_io"}), \ diff --git a/dbms/src/Core/SpillConfig.cpp b/dbms/src/Core/SpillConfig.cpp index d5a63d9adc9..13412bd69f8 100644 --- a/dbms/src/Core/SpillConfig.cpp +++ b/dbms/src/Core/SpillConfig.cpp @@ -27,7 +27,15 @@ bool needReplace(char c) return std::isspace(c) || String::npos != forbidden_or_unusual_chars.find(c); } } // namespace -SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_) +SpillConfig::SpillConfig( + const DB::String & spill_dir_, + const DB::String & spill_id_, + size_t max_cached_data_bytes_in_spiller_, + size_t max_spilled_rows_per_file_, + size_t max_spilled_bytes_per_file_, + const FileProviderPtr & file_provider_, + UInt64 for_all_constant_max_streams_, + UInt64 for_all_constant_block_size_) : spill_dir(spill_dir_) , spill_id(spill_id_) , spill_id_as_file_name_prefix(spill_id) @@ -35,6 +43,8 @@ SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill , max_spilled_rows_per_file(max_spilled_rows_per_file_) , max_spilled_bytes_per_file(max_spilled_bytes_per_file_) , file_provider(file_provider_) + , for_all_constant_max_streams(std::max(1, for_all_constant_max_streams_)) + , for_all_constant_block_size(std::max(1, for_all_constant_block_size_)) { RUNTIME_CHECK_MSG(!spill_dir.empty(), "Spiller dir must be non-empty"); RUNTIME_CHECK_MSG(!spill_id.empty(), "Spiller id must be non-empty"); diff --git a/dbms/src/Core/SpillConfig.h b/dbms/src/Core/SpillConfig.h index 144c4043e0d..b604719bf5e 100644 --- a/dbms/src/Core/SpillConfig.h +++ b/dbms/src/Core/SpillConfig.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -23,7 +24,15 @@ namespace DB struct SpillConfig { public: - SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_); + SpillConfig( + const String & spill_dir_, + const String & spill_id_, + size_t max_cached_data_bytes_in_spiller_, + size_t max_spilled_rows_per_file_, + size_t max_spilled_bytes_per_file_, + const FileProviderPtr & file_provider_, + UInt64 for_all_constant_max_streams_ = 1, + UInt64 for_all_constant_block_size_ = DEFAULT_BLOCK_SIZE); String spill_dir; String spill_id; String spill_id_as_file_name_prefix; @@ -34,5 +43,8 @@ struct SpillConfig /// soft limit of the max bytes per spilled file UInt64 max_spilled_bytes_per_file; FileProviderPtr file_provider; + + UInt64 for_all_constant_max_streams; + UInt64 for_all_constant_block_size; }; } // namespace DB diff --git a/dbms/src/Core/SpillHandler.cpp b/dbms/src/Core/SpillHandler.cpp index 3f051b52999..683eb507c9f 100644 --- a/dbms/src/Core/SpillHandler.cpp +++ b/dbms/src/Core/SpillHandler.cpp @@ -79,49 +79,64 @@ void SpillHandler::spillBlocks(Blocks && blocks) /// todo check the disk usage if (unlikely(blocks.empty())) return; + RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id); + RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); + try { - Stopwatch watch; - RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); - auto block_size = blocks.size(); - LOG_DEBUG(spiller->logger, "Spilling {} blocks data", block_size); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_spill); - size_t total_rows = 0; - size_t rows_in_file = 0; - size_t bytes_in_file = 0; - for (auto & block : blocks) + if unlikely (spiller->isAllConstant()) { - if (unlikely(!block || block.rows() == 0)) - continue; - /// erase constant column - spiller->removeConstantColumns(block); - RUNTIME_CHECK_MSG(block.columns() > 0, "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns"); - if (unlikely(writer == nullptr)) + LOG_WARNING(spiller->logger, "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns"); + for (auto & block : blocks) { - std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile(); + if (unlikely(!block || block.rows() == 0)) + continue; + all_constant_block_rows += block.rows(); } - auto rows = block.rows(); - total_rows += rows; - rows_in_file += rows; - bytes_in_file += block.estimateBytesForSpill(); - writer->write(block); - block.clear(); - if (spiller->enable_append_write && isSpilledFileFull(rows_in_file, bytes_in_file)) + } + else + { + Stopwatch watch; + auto block_size = blocks.size(); + LOG_DEBUG(spiller->logger, "Spilling {} blocks data", block_size); + + size_t total_rows = 0; + size_t rows_in_file = 0; + size_t bytes_in_file = 0; + for (auto & block : blocks) { - spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite()); - spilled_files[current_spilled_file_index]->markFull(); - writer = nullptr; + if (unlikely(!block || block.rows() == 0)) + continue; + /// erase constant column + spiller->removeConstantColumns(block); + RUNTIME_CHECK(block.columns() > 0); + if (unlikely(writer == nullptr)) + { + std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile(); + } + auto rows = block.rows(); + total_rows += rows; + rows_in_file += rows; + bytes_in_file += block.estimateBytesForSpill(); + writer->write(block); + block.clear(); + if (spiller->enable_append_write && isSpilledFileFull(rows_in_file, bytes_in_file)) + { + spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite()); + spilled_files[current_spilled_file_index]->markFull(); + writer = nullptr; + } } + double cost = watch.elapsedSeconds(); + time_cost += cost; + LOG_DEBUG(spiller->logger, "Spilled {} rows from {} blocks into temporary file, time cost: {:.3f} sec.", total_rows, block_size, cost); + RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler is finished.", spiller->config.spill_id); } - double cost = watch.elapsedSeconds(); - time_cost += cost; - LOG_DEBUG(spiller->logger, "Spilled {} rows from {} blocks into temporary file, time cost: {:.3f} sec.", total_rows, block_size, cost); - RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler is finished.", spiller->config.spill_id); + RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); - return; } catch (...) { @@ -174,5 +189,14 @@ void SpillHandler::finish() current_spilled_file_index = INVALID_CURRENT_SPILLED_FILE_INDEX; RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); } + else if unlikely (spiller->isAllConstant()) + { + if (all_constant_block_rows > 0) + { + spiller->recordAllConstantBlockRows(partition_id, all_constant_block_rows); + spiller->has_spilled_data = true; + all_constant_block_rows = 0; + } + } } } // namespace DB diff --git a/dbms/src/Core/SpillHandler.h b/dbms/src/Core/SpillHandler.h index 51f1697036a..7491653eeaf 100644 --- a/dbms/src/Core/SpillHandler.h +++ b/dbms/src/Core/SpillHandler.h @@ -54,6 +54,7 @@ class SpillHandler }; Spiller * spiller; std::vector> spilled_files; + UInt64 all_constant_block_rows = 0; size_t partition_id; Int64 current_spilled_file_index; String current_spill_file_name; diff --git a/dbms/src/Core/Spiller.cpp b/dbms/src/Core/Spiller.cpp index 1ddaf1f45de..2e5367c0cf6 100644 --- a/dbms/src/Core/Spiller.cpp +++ b/dbms/src/Core/Spiller.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -78,16 +79,6 @@ void SpilledFiles::makeAllSpilledFilesImmutable() mutable_spilled_files.clear(); } -bool Spiller::supportSpill(const Block & header) -{ - for (const auto & column_with_type_and_name : header) - { - if (column_with_type_and_name.column == nullptr || !column_with_type_and_name.column->isColumnConst()) - return true; - } - return false; -} - Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 partition_num_, const Block & input_schema_, const LoggerPtr & logger_, Int64 spill_version_, bool release_spilled_file_on_restore_) : config(config_) , is_input_sorted(is_input_sorted_) @@ -98,7 +89,9 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 part , release_spilled_file_on_restore(release_spilled_file_on_restore_) { for (UInt64 i = 0; i < partition_num; ++i) + { spilled_files.push_back(std::make_unique()); + } /// if is_input_sorted is true, can not append write because it will break the sort property enable_append_write = !is_input_sorted && (config.max_spilled_bytes_per_file != 0 || config.max_spilled_rows_per_file != 0); Poco::File spill_dir(config.spill_dir); @@ -118,6 +111,14 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 part } header_without_constants = input_schema; removeConstantColumns(header_without_constants); + if (0 == header_without_constants.columns()) + { + LOG_WARNING(logger, "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns"); + for (UInt64 i = 0; i < partition_num; ++i) + { + all_constant_block_rows.push_back(0); + } + } } void Spiller::removeConstantColumns(Block & block) const @@ -201,67 +202,105 @@ BlockInputStreams Spiller::restoreBlocks(UInt64 partition_id, UInt64 max_stream_ { RUNTIME_CHECK_MSG(partition_id < partition_num, "{}: partition id {} exceeds partition num {}.", config.spill_id, partition_id, partition_num); RUNTIME_CHECK_MSG(isSpillFinished(), "{}: restore before the spiller is finished.", config.spill_id); - std::lock_guard partition_lock(spilled_files[partition_id]->spilled_files_mutex); - RUNTIME_CHECK_MSG(spilled_files[partition_id]->mutable_spilled_files.empty(), "{}: the mutable spilled files must be empty when restore.", config.spill_id); - auto & partition_spilled_files = spilled_files[partition_id]->immutable_spilled_files; - if (max_stream_size == 0) - max_stream_size = partition_spilled_files.size(); - if (is_input_sorted && partition_spilled_files.size() > max_stream_size) - { - LOG_WARNING(logger, "Sorted spilled data restore does not take max_stream_size into account"); - } - - SpillDetails details{0, 0, 0}; BlockInputStreams ret; - UInt64 spill_file_read_stream_num = is_input_sorted ? partition_spilled_files.size() : std::min(max_stream_size, partition_spilled_files.size()); - std::vector restore_stream_read_rows; - - if (is_input_sorted) + if unlikely (isAllConstant()) { - for (auto & file : partition_spilled_files) + UInt64 total_rows = 0; { - RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path()); - details.merge(file->getSpillDetails()); - std::vector file_infos; - file_infos.emplace_back(file->path()); - restore_stream_read_rows.push_back(file->getSpillDetails().rows); + std::lock_guard lock(all_constant_mutex); + total_rows = all_constant_block_rows[partition_id]; if (release_spilled_file_on_restore) - file_infos.back().file = std::move(file); - ret.push_back(std::make_shared(std::move(file_infos), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version)); + all_constant_block_rows[partition_id] = 0; + } + if (total_rows > 0) + { + if (max_stream_size == 0) + max_stream_size = config.for_all_constant_max_streams; + std::vector stream_rows; + stream_rows.resize(max_stream_size, 0); + size_t index = 0; + while (total_rows > 0) + { + auto cur_rows = std::min(total_rows, config.for_all_constant_block_size); + total_rows -= cur_rows; + stream_rows[index++] += cur_rows; + if (index == stream_rows.size()) + index = 0; + } + for (auto stream_row : stream_rows) + { + if (stream_row > 0) + ret.push_back(std::make_shared(input_schema, stream_row, config.for_all_constant_block_size)); + } } } else { - std::vector> file_infos(spill_file_read_stream_num); - restore_stream_read_rows.resize(spill_file_read_stream_num, 0); - // todo balance based on SpilledRows - for (size_t i = 0; i < partition_spilled_files.size(); ++i) + std::lock_guard partition_lock(spilled_files[partition_id]->spilled_files_mutex); + RUNTIME_CHECK_MSG(spilled_files[partition_id]->mutable_spilled_files.empty(), "{}: the mutable spilled files must be empty when restore.", config.spill_id); + auto & partition_spilled_files = spilled_files[partition_id]->immutable_spilled_files; + + if (max_stream_size == 0) + max_stream_size = partition_spilled_files.size(); + if (is_input_sorted && partition_spilled_files.size() > max_stream_size) { - auto & file = partition_spilled_files[i]; - RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path()); - details.merge(file->getSpillDetails()); - file_infos[i % spill_file_read_stream_num].push_back(file->path()); - restore_stream_read_rows[i % spill_file_read_stream_num] += file->getSpillDetails().rows; - if (release_spilled_file_on_restore) - file_infos[i % spill_file_read_stream_num].back().file = std::move(file); + LOG_WARNING(logger, "Sorted spilled data restore does not take max_stream_size into account"); } - for (UInt64 i = 0; i < spill_file_read_stream_num; ++i) + + SpillDetails details{0, 0, 0}; + UInt64 spill_file_read_stream_num = is_input_sorted ? partition_spilled_files.size() : std::min(max_stream_size, partition_spilled_files.size()); + std::vector restore_stream_read_rows; + + if (is_input_sorted) { - if (likely(!file_infos[i].empty())) - ret.push_back(std::make_shared(std::move(file_infos[i]), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version)); + for (auto & file : partition_spilled_files) + { + RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path()); + details.merge(file->getSpillDetails()); + std::vector file_infos; + file_infos.emplace_back(file->path()); + restore_stream_read_rows.push_back(file->getSpillDetails().rows); + if (release_spilled_file_on_restore) + file_infos.back().file = std::move(file); + ret.push_back(std::make_shared(std::move(file_infos), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version)); + } + } + else + { + std::vector> file_infos(spill_file_read_stream_num); + restore_stream_read_rows.resize(spill_file_read_stream_num, 0); + // todo balance based on SpilledRows + for (size_t i = 0; i < partition_spilled_files.size(); ++i) + { + auto & file = partition_spilled_files[i]; + RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path()); + details.merge(file->getSpillDetails()); + file_infos[i % spill_file_read_stream_num].push_back(file->path()); + restore_stream_read_rows[i % spill_file_read_stream_num] += file->getSpillDetails().rows; + if (release_spilled_file_on_restore) + file_infos[i % spill_file_read_stream_num].back().file = std::move(file); + } + for (UInt64 i = 0; i < spill_file_read_stream_num; ++i) + { + if (likely(!file_infos[i].empty())) + ret.push_back(std::make_shared(std::move(file_infos[i]), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version)); + } + } + for (size_t i = 0; i < spill_file_read_stream_num; ++i) + LOG_TRACE(logger, "Restore {} rows from {}-th stream", restore_stream_read_rows[i], i); + LOG_INFO(logger, "Will restore {} rows from {} files of size {:.3f} MiB compressed, {:.3f} MiB uncompressed using {} streams.", details.rows, spilled_files[partition_id]->immutable_spilled_files.size(), (details.data_bytes_compressed / 1048576.0), (details.data_bytes_uncompressed / 1048576.0), ret.size()); + if (release_spilled_file_on_restore) + { + /// clear the spilled_files so we can safely assume that the element in spilled_files is always not nullptr + partition_spilled_files.clear(); } } - for (size_t i = 0; i < spill_file_read_stream_num; ++i) - LOG_TRACE(logger, "Restore {} rows from {}-th stream", restore_stream_read_rows[i], i); - LOG_INFO(logger, "Will restore {} rows from {} files of size {:.3f} MiB compressed, {:.3f} MiB uncompressed using {} streams.", details.rows, spilled_files[partition_id]->immutable_spilled_files.size(), (details.data_bytes_compressed / 1048576.0), (details.data_bytes_uncompressed / 1048576.0), ret.size()); - if (release_spilled_file_on_restore) - { - /// clear the spilled_files so we can safely assume that the element in spilled_files is always not nullptr - partition_spilled_files.clear(); - } + if (ret.empty()) + { ret.push_back(std::make_shared(input_schema)); + } if (append_dummy_read_stream) { /// if append_dummy_read_stream = true, make sure at least `max_stream_size`'s streams are returned, will be used in join diff --git a/dbms/src/Core/Spiller.h b/dbms/src/Core/Spiller.h index 4d82b9912df..d2417d25c8d 100644 --- a/dbms/src/Core/Spiller.h +++ b/dbms/src/Core/Spiller.h @@ -18,6 +18,8 @@ #include #include +#include + namespace DB { class IBlockInputStream; @@ -85,7 +87,6 @@ struct SpilledFiles class Spiller { public: - static bool supportSpill(const Block & header); Spiller(const SpillConfig & config, bool is_input_sorted, UInt64 partition_num, const Block & input_schema, const LoggerPtr & logger, Int64 spill_version = 1, bool release_spilled_file_on_restore = true); void spillBlocks(Blocks && blocks, UInt64 partition_id); SpillHandler createSpillHandler(UInt64 partition_id); @@ -113,7 +114,16 @@ class Spiller std::lock_guard lock(spill_finished_mutex); return spill_finished; } + bool isAllConstant() const { return header_without_constants.columns() == 0; } + void recordAllConstantBlockRows(UInt64 partition_id, UInt64 rows) + { + assert(isAllConstant()); + RUNTIME_CHECK_MSG(isSpillFinished() == false, "{}: spill after the spiller is finished.", config.spill_id); + std::lock_guard lock(all_constant_mutex); + all_constant_block_rows[partition_id] += rows; + } +private: SpillConfig config; const bool is_input_sorted; const UInt64 partition_num; @@ -127,6 +137,12 @@ class Spiller std::atomic has_spilled_data{false}; static std::atomic tmp_file_index; std::vector> spilled_files; + + // Used for the case that spilled blocks containing only constant columns. + // Record the rows of these blocks. + std::mutex all_constant_mutex; + std::vector all_constant_block_rows; + const Int64 spill_version = 1; /// If release_spilled_file_on_restore is true, the spilled file will be released once all the data in the spilled /// file is read, otherwise, the spilled file will be released when destruct the spiller. Currently, all the spilled diff --git a/dbms/src/Core/tests/gtest_spiller.cpp b/dbms/src/Core/tests/gtest_spiller.cpp index efa7437eafc..81f7bfcae48 100644 --- a/dbms/src/Core/tests/gtest_spiller.cpp +++ b/dbms/src/Core/tests/gtest_spiller.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -22,7 +23,6 @@ #include #include - namespace DB { namespace tests @@ -235,6 +235,7 @@ try } } spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); for (size_t partition_id = 0; partition_id < partition_num; ++partition_id) { size_t max_restore_streams = 2 + partition_id * 10; @@ -271,6 +272,7 @@ try } } spiller->finishSpill(); + ASSERT_TRUE(spiller->hasSpilledData()); for (size_t partition_id = 0; partition_id < partition_num; ++partition_id) { size_t max_restore_streams = 2 + partition_id * 10; @@ -301,6 +303,7 @@ try auto blocks_to_spill = blocks; spiller->spillBlocks(std::move(blocks_to_spill), 0); spiller->finishSpill(); + ASSERT_TRUE(spiller->hasSpilledData()); verifyRestoreBlocks(*spiller, 0, 0, 0, blocks); if (!spiller->releaseSpilledFileOnRestore()) verifyRestoreBlocks(*spiller, 0, 0, 0, blocks); @@ -331,6 +334,7 @@ try } } spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); for (size_t partition_id = 0; partition_id < partition_num; ++partition_id) { size_t max_restore_streams = 2 + partition_id * 10; @@ -367,6 +371,7 @@ try } } spiller->finishSpill(); + ASSERT_TRUE(spiller->hasSpilledData()); for (size_t partition_id = 0; partition_id < partition_num; ++partition_id) { size_t max_restore_streams = 2 + partition_id * 10; @@ -395,6 +400,7 @@ try spiller.spillBlocks(std::move(blocks), 0); spiller.spillBlocks(std::move(blocks_copy), 0); spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); verifyRestoreBlocks(spiller, 0, 20, 1, all_blocks, false); } /// append_dummy_read = true @@ -409,6 +415,7 @@ try spiller.spillBlocks(std::move(blocks), 0); spiller.spillBlocks(std::move(blocks_copy), 0); spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); verifyRestoreBlocks(spiller, 0, 20, 20, all_blocks, true); } } @@ -431,6 +438,7 @@ try spiller.spillBlocks(std::move(blocks), 0); spiller.spillBlocks(std::move(blocks_copy), 0); spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); verifyRestoreBlocks(spiller, 0, 2, 1, all_blocks); } /// case 2, one spill write to multiple files @@ -441,6 +449,7 @@ try auto reference = all_blocks; spiller.spillBlocks(std::move(all_blocks), 0); spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); verifyRestoreBlocks(spiller, 0, 0, 20, reference); } /// case 3, spill empty blocks to existing spilled file @@ -456,6 +465,7 @@ try auto block_input_stream = std::make_shared(std::move(empty_blocks_list)); spiller.spillBlocksUsingBlockInputStream(block_input_stream, 0, []() { return false; }); spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); verifyRestoreBlocks(spiller, 0, 2, 1, reference); } /// case 4, spill empty blocks to new spilled file @@ -502,14 +512,89 @@ try for (auto & type_and_name : constant_header) type_and_name.column = type_and_name.type->createColumnConst(1, Field(static_cast(1))); - Spiller spiller(*spill_config_ptr, false, 1, constant_header, logger); - spiller.spillBlocks({constant_header}, 0); - GTEST_FAIL(); -} -catch (Exception & e) -{ - GTEST_ASSERT_EQ(e.message().find("Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns") != std::string::npos, true); + { + Blocks blocks; + size_t rows = 10; + size_t block_num = 10; + for (size_t i = 0; i < block_num; ++i) + { + Block block = constant_header; + for (auto & col : block) + col.column = col.column->cloneResized(rows); + blocks.push_back(std::move(block)); + } + auto new_spill_path = fmt::format("{}{}_{}", spill_config_ptr->spill_dir, "SpillAllConstantBlock1", rand()); + SpillConfig new_spill_config(new_spill_path, spill_config_ptr->spill_id, spill_config_ptr->max_cached_data_bytes_in_spiller, 0, 0, spill_config_ptr->file_provider, 100, DEFAULT_BLOCK_SIZE); + Spiller spiller(new_spill_config, false, 1, constant_header, logger); + spiller.spillBlocks(std::move(blocks), 0); + spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); + + Block expected_block = constant_header; + for (auto & col : expected_block) + col.column = col.column->cloneResized(rows * block_num); + verifyRestoreBlocks(spiller, 0, 100, 1, {expected_block}); + } + + { + Blocks blocks; + size_t block_num = DEFAULT_BLOCK_SIZE + 1; + for (size_t i = 0; i < block_num; ++i) + { + Block block = constant_header; + for (auto & col : block) + col.column = col.column->cloneResized(1); + blocks.push_back(std::move(block)); + } + auto new_spill_path = fmt::format("{}{}_{}", spill_config_ptr->spill_dir, "SpillAllConstantBlock2", rand()); + SpillConfig new_spill_config(new_spill_path, spill_config_ptr->spill_id, spill_config_ptr->max_cached_data_bytes_in_spiller, 0, 0, spill_config_ptr->file_provider, 100, DEFAULT_BLOCK_SIZE); + Spiller spiller(new_spill_config, false, 1, constant_header, logger); + spiller.spillBlocks(std::move(blocks), 0); + spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); + + Blocks expected_blocks; + Block block1 = constant_header; + for (auto & col : block1) + col.column = col.column->cloneResized(new_spill_config.for_all_constant_block_size); + expected_blocks.push_back(std::move(block1)); + Block block2 = constant_header; + for (auto & col : block2) + col.column = col.column->cloneResized(1); + expected_blocks.push_back(std::move(block2)); + verifyRestoreBlocks(spiller, 0, 100, 2, expected_blocks); + } + + { + Blocks blocks; + size_t block_num = DEFAULT_BLOCK_SIZE + 1; + for (size_t i = 0; i < block_num; ++i) + { + Block block = constant_header; + for (auto & col : block) + col.column = col.column->cloneResized(1); + blocks.push_back(std::move(block)); + } + auto new_spill_path = fmt::format("{}{}_{}", spill_config_ptr->spill_dir, "SpillAllConstantBlock3", rand()); + SpillConfig new_spill_config(new_spill_path, spill_config_ptr->spill_id, spill_config_ptr->max_cached_data_bytes_in_spiller, 0, 0, spill_config_ptr->file_provider, 100, DEFAULT_BLOCK_SIZE); + Spiller spiller(new_spill_config, false, 1, constant_header, logger); + spiller.spillBlocks(std::move(blocks), 0); + spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData()); + + Blocks expected_blocks; + Block block1 = constant_header; + for (auto & col : block1) + col.column = col.column->cloneResized(new_spill_config.for_all_constant_block_size); + expected_blocks.push_back(std::move(block1)); + Block block2 = constant_header; + for (auto & col : block2) + col.column = col.column->cloneResized(1); + expected_blocks.push_back(std::move(block2)); + verifyRestoreBlocks(spiller, 0, 1, 1, expected_blocks); + } } +CATCH TEST_F(SpillerTest, SpillWithConstantSchemaAndNonConstantData) try diff --git a/dbms/src/DataStreams/ConstantsBlockInputStream.h b/dbms/src/DataStreams/ConstantsBlockInputStream.h new file mode 100644 index 00000000000..291b7f66b83 --- /dev/null +++ b/dbms/src/DataStreams/ConstantsBlockInputStream.h @@ -0,0 +1,59 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class ConstantsBlockInputStream : public IBlockInputStream +{ +public: + ConstantsBlockInputStream(const Block & header, UInt64 rows_, UInt64 max_block_size_) + : header(header) + , remaining_rows(rows_) + , max_block_size(std::max(1, max_block_size_)) + { + RUNTIME_CHECK_MSG(header.columns() > 0, "the empty header is illegal."); + for (const auto & col : header) + { + RUNTIME_CHECK(col.column != nullptr && col.column->isColumnConst()); + } + } + + Block read() override + { + if unlikely (remaining_rows == 0) + return {}; + + size_t cur_rows = std::min(max_block_size, remaining_rows); + remaining_rows -= cur_rows; + Block block = header; + for (auto & col : block) + { + col.column = col.column->cloneResized(cur_rows); + } + return block; + } + Block getHeader() const override { return header; } + String getName() const override { return "Constants"; } + +private: + const Block header; + UInt64 remaining_rows; + const UInt64 max_block_size; +}; +} // namespace DB diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 46d4b5eded9..3c4abda35e8 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -47,15 +47,7 @@ MergeSortingBlockInputStream::MergeSortingBlockInputStream( SortHelper::removeConstantsFromSortDescription(header, description); if (max_bytes_before_external_sort > 0) { - if (Spiller::supportSpill(header_without_constants)) - { - spiller = std::make_unique(spill_config, true, 1, header_without_constants, log); - } - else - { - max_bytes_before_external_sort = 0; - LOG_WARNING(log, "Sort/TopN does not support spill, reason: input data contains only constant columns"); - } + spiller = std::make_unique(spill_config, true, 1, header_without_constants, log); } } diff --git a/dbms/src/DataStreams/NullBlockInputStream.h b/dbms/src/DataStreams/NullBlockInputStream.h index 3eae75111ca..456aaf76db3 100644 --- a/dbms/src/DataStreams/NullBlockInputStream.h +++ b/dbms/src/DataStreams/NullBlockInputStream.h @@ -25,7 +25,9 @@ namespace DB class NullBlockInputStream : public IBlockInputStream { public: - NullBlockInputStream(const Block & header) : header(header) {} + explicit NullBlockInputStream(const Block & header) + : header(header) + {} Block read() override { return {}; } Block getHeader() const override { return header; } @@ -35,4 +37,4 @@ class NullBlockInputStream : public IBlockInputStream Block header; }; -} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 01fb662541f..7b6e8c2834a 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -270,8 +270,8 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & tiflash_join.fillJoinOtherConditionsAction(context, left_input_header, right_input_header, probe_side_prepare_actions, original_probe_key_names, original_build_key_names, join_non_equal_conditions); const Settings & settings = context.getSettingsRef(); - SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); - SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider(), settings.max_threads, settings.max_block_size); + SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider(), settings.max_threads, settings.max_block_size); size_t max_block_size = settings.max_block_size; fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size = 1; }); @@ -413,7 +413,9 @@ void DAGQueryBlockInterpreter::executeAggregation( settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, - context.getFileProvider()); + context.getFileProvider(), + settings.max_threads, + settings.max_block_size); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.h b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.h index 05307275757..65e85f201bf 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.h @@ -29,7 +29,7 @@ namespace DB class IOPriorityQueue : public TaskQueue { public: - // // The ratio of total execution time between io_in and io_out is 3:1. + // The ratio of total execution time between io_out and io_in is 3:1. static constexpr size_t ratio_of_out_to_in = 3; ~IOPriorityQueue() override; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_mlfq.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_mlfq.cpp index cbfccd0e7de..801cba5973c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_mlfq.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_mlfq.cpp @@ -78,7 +78,7 @@ try { TaskPtr task = std::make_unique(); auto value = mock_value(); - queue->updateStatistics(task, ExecTaskStatus::INIT, value); + queue->updateStatistics(task, ExecTaskStatus::RUNNING, value); task->profile_info.addCPUExecuteTime(value); queue->submit(std::move(task)); } @@ -113,7 +113,7 @@ try ASSERT_EQ(task->mlfq_level, level); ASSERT_TRUE(task); auto value = CPUMultiLevelFeedbackQueue::LEVEL_TIME_SLICE_BASE_NS; - queue.updateStatistics(task, ExecTaskStatus::INIT, value); + queue.updateStatistics(task, ExecTaskStatus::RUNNING, value); task->profile_info.addCPUExecuteTime(value); bool need_break = CPUTimeGetter::get(task) >= queue.getUnitQueueInfo(level).time_slice; queue.submit(std::move(task)); @@ -140,7 +140,7 @@ try TaskPtr task = std::make_unique(); task->mlfq_level = CPUMultiLevelFeedbackQueue::QUEUE_SIZE - 1; auto value = queue.getUnitQueueInfo(task->mlfq_level).time_slice; - queue.updateStatistics(task, ExecTaskStatus::INIT, value); + queue.updateStatistics(task, ExecTaskStatus::RUNNING, value); task->profile_info.addCPUExecuteTime(value); queue.submit(std::move(task)); } @@ -148,7 +148,7 @@ try // level `0` TaskPtr task = std::make_unique(); auto value = queue.getUnitQueueInfo(0).time_slice - 1; - queue.updateStatistics(task, ExecTaskStatus::INIT, value); + queue.updateStatistics(task, ExecTaskStatus::RUNNING, value); task->profile_info.addCPUExecuteTime(value); queue.submit(std::move(task)); } @@ -174,7 +174,7 @@ try // level `0` TaskPtr task = std::make_unique(); auto value = queue.getUnitQueueInfo(0).time_slice - 1; - queue.updateStatistics(task, ExecTaskStatus::INIT, value); + queue.updateStatistics(task, ExecTaskStatus::RUNNING, value); task->profile_info.addCPUExecuteTime(value); queue.submit(std::move(task)); } @@ -183,7 +183,7 @@ try TaskPtr task = std::make_unique(); task->mlfq_level = CPUMultiLevelFeedbackQueue::QUEUE_SIZE - 1; auto value = queue.getUnitQueueInfo(task->mlfq_level).time_slice; - queue.updateStatistics(task, ExecTaskStatus::INIT, value); + queue.updateStatistics(task, ExecTaskStatus::RUNNING, value); task->profile_info.addCPUExecuteTime(value); queue.submit(std::move(task)); } diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp index 58d05dcb814..46f6537d590 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp @@ -40,12 +40,59 @@ TaskScheduler::~TaskScheduler() wait_reactor.waitForStop(); } +void TaskScheduler::submit(TaskPtr && task) +{ + auto task_status = task->getStatus(); + switch (task_status) + { + case ExecTaskStatus::RUNNING: + submitToCPUTaskThreadPool(std::move(task)); + break; + case ExecTaskStatus::IO_IN: + case ExecTaskStatus::IO_OUT: + submitToIOTaskThreadPool(std::move(task)); + break; + case ExecTaskStatus::WAITING: + submitToWaitReactor(std::move(task)); + break; + default: + throw Exception(fmt::format("Unexpected task status: {}", magic_enum::enum_name(task_status))); + } +} + void TaskScheduler::submit(std::vector & tasks) { if (unlikely(tasks.empty())) return; - cpu_task_thread_pool.submit(tasks); + std::vector cpu_tasks; + std::vector io_tasks; + std::list await_tasks; + for (auto & task : tasks) + { + auto task_status = task->getStatus(); + switch (task_status) + { + case ExecTaskStatus::RUNNING: + cpu_tasks.push_back(std::move(task)); + break; + case ExecTaskStatus::IO_IN: + case ExecTaskStatus::IO_OUT: + io_tasks.push_back(std::move(task)); + break; + case ExecTaskStatus::WAITING: + await_tasks.push_back(std::move(task)); + break; + default: + throw Exception(fmt::format("Unexpected task status: {}", magic_enum::enum_name(task_status))); + } + } + if (!cpu_tasks.empty()) + submitToCPUTaskThreadPool(cpu_tasks); + if (!io_tasks.empty()) + submitToIOTaskThreadPool(io_tasks); + if (!await_tasks.empty()) + wait_reactor.submit(await_tasks); } void TaskScheduler::submitToWaitReactor(TaskPtr && task) diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h index 1ad21525e33..80ea4349621 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h @@ -58,6 +58,7 @@ class TaskScheduler ~TaskScheduler(); + void submit(TaskPtr && task); void submit(std::vector & tasks); void submitToWaitReactor(TaskPtr && task); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp index a23c6a947b0..ba378d1bd2c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp @@ -57,8 +57,9 @@ EventTask::EventTask( MemoryTrackerPtr mem_tracker_, const String & req_id, PipelineExecutorStatus & exec_status_, - const EventPtr & event_) - : Task(std::move(mem_tracker_), req_id) + const EventPtr & event_, + ExecTaskStatus init_status) + : Task(std::move(mem_tracker_), req_id, init_status) , exec_status(exec_status_) , event(event_) { diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h index 215c9116130..6b65ca295c3 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h @@ -25,6 +25,7 @@ class PipelineExecutorStatus; class EventTask : public Task { public: + // Only used for unit test. EventTask( PipelineExecutorStatus & exec_status_, const EventPtr & event_); @@ -32,7 +33,8 @@ class EventTask : public Task MemoryTrackerPtr mem_tracker_, const String & req_id, PipelineExecutorStatus & exec_status_, - const EventPtr & event_); + const EventPtr & event_, + ExecTaskStatus init_status = ExecTaskStatus::RUNNING); protected: ExecTaskStatus executeImpl() override; diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/IOEventTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/IOEventTask.h index b4affb98d3f..27e6bd5a1ae 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/IOEventTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/IOEventTask.h @@ -27,7 +27,12 @@ class IOEventTask : public EventTask const String & req_id, PipelineExecutorStatus & exec_status_, const EventPtr & event_) - : EventTask(std::move(mem_tracker_), req_id, exec_status_, event_) + : EventTask( + std::move(mem_tracker_), + req_id, + exec_status_, + event_, + is_input ? ExecTaskStatus::IO_IN : ExecTaskStatus::IO_OUT) { } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp index bd17d626ed8..512a2b10e8c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp @@ -25,7 +25,7 @@ PipelineTask::PipelineTask( PipelineExecutorStatus & exec_status_, const EventPtr & event_, PipelineExecPtr && pipeline_exec_) - : EventTask(std::move(mem_tracker_), req_id, exec_status_, event_) + : EventTask(std::move(mem_tracker_), req_id, exec_status_, event_, ExecTaskStatus::RUNNING) , pipeline_exec_holder(std::move(pipeline_exec_)) , pipeline_exec(pipeline_exec_holder.get()) { diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp index 6ed24ba19a4..77ca4363622 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp @@ -48,7 +48,6 @@ ALWAYS_INLINE void addToStatusMetrics(ExecTaskStatus to) } #endif - // It is impossible for any task to change to init status. switch (to) { M(ExecTaskStatus::WAITING, type_to_waiting) @@ -70,19 +69,21 @@ Task::Task() : log(Logger::get()) , mem_tracker_holder(nullptr) , mem_tracker_ptr(nullptr) + , task_status(ExecTaskStatus::RUNNING) { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_construct_failpoint); - GET_METRIC(tiflash_pipeline_task_change_to_status, type_to_init).Increment(); + addToStatusMetrics(task_status); } -Task::Task(MemoryTrackerPtr mem_tracker_, const String & req_id) +Task::Task(MemoryTrackerPtr mem_tracker_, const String & req_id, ExecTaskStatus init_status) : log(Logger::get(req_id)) , mem_tracker_holder(std::move(mem_tracker_)) , mem_tracker_ptr(mem_tracker_holder.get()) + , task_status(init_status) { assert(mem_tracker_holder.get() == mem_tracker_ptr); FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_construct_failpoint); - GET_METRIC(tiflash_pipeline_task_change_to_status, type_to_init).Increment(); + addToStatusMetrics(task_status); } Task::~Task() @@ -99,7 +100,7 @@ Task::~Task() ExecTaskStatus Task::execute() { assert(mem_tracker_ptr == current_memory_tracker); - assert(task_status == ExecTaskStatus::RUNNING || task_status == ExecTaskStatus::INIT); + assert(task_status == ExecTaskStatus::RUNNING); switchStatus(executeImpl()); return task_status; } @@ -107,7 +108,7 @@ ExecTaskStatus Task::execute() ExecTaskStatus Task::executeIO() { assert(mem_tracker_ptr == current_memory_tracker); - assert(task_status == ExecTaskStatus::IO_IN || task_status == ExecTaskStatus::IO_OUT || task_status == ExecTaskStatus::INIT); + assert(task_status == ExecTaskStatus::IO_IN || task_status == ExecTaskStatus::IO_OUT); switchStatus(executeIOImpl()); return task_status; } @@ -117,7 +118,7 @@ ExecTaskStatus Task::await() // Because await only performs polling checks and does not involve computing/memory tracker memory allocation, // await will not invoke MemoryTracker, so current_memory_tracker must be nullptr here. assert(current_memory_tracker == nullptr); - assert(task_status == ExecTaskStatus::WAITING || task_status == ExecTaskStatus::INIT); + assert(task_status == ExecTaskStatus::WAITING); switchStatus(awaitImpl()); return task_status; } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index be2709f4277..f629cdb759c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -34,7 +34,6 @@ namespace DB */ enum class ExecTaskStatus { - INIT, WAITING, RUNNING, IO_IN, @@ -47,9 +46,10 @@ enum class ExecTaskStatus class Task { public: + // Only used for unit test. Task(); - Task(MemoryTrackerPtr mem_tracker_, const String & req_id); + Task(MemoryTrackerPtr mem_tracker_, const String & req_id, ExecTaskStatus init_status = ExecTaskStatus::RUNNING); virtual ~Task(); @@ -104,7 +104,7 @@ class Task // To reduce the overheads of `mem_tracker_holder.get()` MemoryTracker * mem_tracker_ptr; - ExecTaskStatus task_status{ExecTaskStatus::INIT}; + ExecTaskStatus task_status; bool is_finalized = false; }; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index bd862923da2..bcfd04e5fc2 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -103,7 +103,9 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, - context.getFileProvider()); + context.getFileProvider(), + context.getSettingsRef().max_threads, + context.getSettingsRef().max_block_size); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, @@ -184,7 +186,9 @@ void PhysicalAggregation::buildPipelineExecGroupImpl( context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, - context.getFileProvider()); + context.getFileProvider(), + context.getSettingsRef().max_threads, + context.getSettingsRef().max_block_size); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp index 3c24495ab18..9483c29e534 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp @@ -44,7 +44,9 @@ void PhysicalAggregationBuild::buildPipelineExecGroupImpl( context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, - context.getFileProvider()); + context.getFileProvider(), + context.getSettingsRef().max_threads, + context.getSettingsRef().max_block_size); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index ec3c0e55a01..9335cbaced8 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -127,8 +127,8 @@ PhysicalPlanNodePtr PhysicalJoin::build( max_bytes_before_external_join = 0; LOG_WARNING(log, "Pipeline model does not support disk-based join, so set max_bytes_before_external_join = 0"); } - SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); - SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider(), settings.max_threads, settings.max_block_size); + SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider(), settings.max_threads, settings.max_block_size); size_t max_block_size = settings.max_block_size; fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size = 1; }); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index d1c33a2f893..f1f77cf86b0 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -285,8 +285,7 @@ Aggregator::Aggregator(const Params & params_, const String & req_id) /// init spiller if needed auto header = getHeader(false); bool is_convertible_to_two_level = AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen); - bool is_input_support_spill = Spiller::supportSpill(header); - if (is_convertible_to_two_level && is_input_support_spill) + if (is_convertible_to_two_level) { /// for aggregation, the input block is sorted by bucket number /// so it can work with MergingAggregatedMemoryEfficientBlockInputStream @@ -295,7 +294,7 @@ Aggregator::Aggregator(const Params & params_, const String & req_id) else { params.setMaxBytesBeforeExternalGroupBy(0); - LOG_WARNING(log, "Aggregation does not support spill, reason: {}", is_convertible_to_two_level ? "aggregator hash table does not support two level" : "input data contains only constant columns"); + LOG_WARNING(log, "Aggregation does not support spill because aggregator hash table does not support two level"); } } } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 05b8cc3a6e6..02d5cb1dbe3 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -30,6 +30,7 @@ #include #include +#include #include namespace DB @@ -136,7 +137,7 @@ Join::Join( const String & flag_mapped_entry_helper_name_, size_t restore_round_, bool is_test_, - const std::vector runtime_filter_list_) + const std::vector & runtime_filter_list_) : restore_round(restore_round_) , match_helper_name(match_helper_name_) , flag_mapped_entry_helper_name(flag_mapped_entry_helper_name_) @@ -360,11 +361,6 @@ void Join::initBuild(const Block & sample_block, size_t build_concurrency_) max_bytes_before_external_join = 0; LOG_WARNING(log, "Join does not support spill, reason: null aware join spill is not supported"); } - if (!Spiller::supportSpill(build_sample_block)) - { - max_bytes_before_external_join = 0; - LOG_WARNING(log, "Join does not support spill, reason: input data from build side contains only constant columns"); - } if (max_bytes_before_external_join > 0) build_spiller = std::make_unique(build_spill_config, false, build_concurrency_, build_sample_block, log); } @@ -377,18 +373,7 @@ void Join::initProbe(const Block & sample_block, size_t probe_concurrency_) setProbeConcurrency(probe_concurrency_); probe_sample_block = sample_block; if (max_bytes_before_external_join > 0) - { - if (!Spiller::supportSpill(probe_sample_block)) - { - max_bytes_before_external_join = 0; - build_spiller = nullptr; - LOG_WARNING(log, "Join does not support spill, reason: input data from probe side contains only constant columns"); - } - else - { - probe_spiller = std::make_unique(probe_spill_config, false, build_concurrency, probe_sample_block, log); - } - } + probe_spiller = std::make_unique(probe_spill_config, false, build_concurrency, probe_sample_block, log); } /// the block should be valid. @@ -474,7 +459,7 @@ void Join::insertFromBlock(const Block & block, size_t stream_index) continue; } } - build_spiller->spillBlocks(std::move(blocks_to_spill), i); + spillBuildSideBlocks(i, std::move(blocks_to_spill)); } #ifdef DBMS_PUBLIC_GTEST // for join spill to disk gtest @@ -1758,6 +1743,16 @@ IColumn::Selector Join::selectDispatchBlock(const Strings & key_columns_names, c return hashToSelector(hash); } +void Join::spillBuildSideBlocks(UInt64 part_id, Blocks && blocks) +{ + build_spiller->spillBlocks(std::move(blocks), part_id); +} + +void Join::spillProbeSideBlocks(UInt64 part_id, Blocks && blocks) +{ + probe_spiller->spillBlocks(std::move(blocks), part_id); +} + void Join::spillMostMemoryUsedPartitionIfNeed() { Int64 target_partition_index = -1; @@ -1805,7 +1800,7 @@ void Join::spillMostMemoryUsedPartitionIfNeed() blocks_to_spill = partitions[target_partition_index]->trySpillBuildPartition(true, build_spill_config.max_cached_data_bytes_in_spiller, partition_lock); spilled_partition_indexes.push_back(target_partition_index); } - build_spiller->spillBlocks(std::move(blocks_to_spill), target_partition_index); + spillBuildSideBlocks(target_partition_index, std::move(blocks_to_spill)); LOG_DEBUG(log, fmt::format("all bytes used after spill: {}", getTotalByteCount())); } @@ -1833,70 +1828,63 @@ std::optional Join::getOneRestoreStream(size_t max_block_size_) throw Exception(error_message); try { - LOG_TRACE(log, fmt::format("restore_build_streams {}, restore_probe_streams {}, restore_scan_hash_map_streams {}", restore_build_streams.size(), restore_build_streams.size(), restore_scan_hash_map_streams.size())); - assert(restore_build_streams.size() == restore_probe_streams.size() && restore_build_streams.size() == restore_scan_hash_map_streams.size()); - auto get_back_stream = [](BlockInputStreams & streams) { - BlockInputStreamPtr stream = streams.back(); - streams.pop_back(); - return stream; - }; - if (!restore_build_streams.empty()) + while (true) { - auto build_stream = get_back_stream(restore_build_streams); - auto probe_stream = get_back_stream(restore_probe_streams); - auto scan_hash_map_stream = get_back_stream(restore_scan_hash_map_streams); - if (restore_build_streams.empty()) + LOG_TRACE(log, "restore_infos {}", restore_infos.size()); + if (!restore_infos.empty()) { - spilled_partition_indexes.pop_front(); + auto restore_info = std::move(restore_infos.back()); + restore_infos.pop_back(); + if (restore_infos.empty()) + { + spilled_partition_indexes.pop_front(); + } + return restore_info; } - return RestoreInfo{restore_join, std::move(scan_hash_map_stream), std::move(build_stream), std::move(probe_stream)}; - } - if (spilled_partition_indexes.empty()) - { - return {}; - } - auto spilled_partition_index = spilled_partition_indexes.front(); - RUNTIME_CHECK_MSG(partitions[spilled_partition_index]->isSpill(), "should not restore unspilled partition."); - if (restore_join_build_concurrency <= 0) - restore_join_build_concurrency = getRestoreJoinBuildConcurrency(partitions.size(), spilled_partition_indexes.size(), join_restore_concurrency, probe_concurrency); - /// for restore join we make sure that the build concurrency is at least 2, so it can be spill again - assert(restore_join_build_concurrency >= 2); - LOG_INFO(log, "Begin restore data from disk for hash join, partition {}, restore round {}, build concurrency {}.", spilled_partition_index, restore_round, restore_join_build_concurrency); - restore_build_streams = build_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true); - restore_probe_streams = probe_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true); - restore_scan_hash_map_streams.resize(restore_join_build_concurrency, nullptr); - RUNTIME_CHECK_MSG(restore_build_streams.size() == static_cast(restore_join_build_concurrency), "restore streams size must equal to restore_join_build_concurrency"); - auto new_max_bytes_before_external_join = static_cast(max_bytes_before_external_join * (static_cast(restore_join_build_concurrency) / build_concurrency)); - restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join)); - restore_join->initBuild(build_sample_block, restore_join_build_concurrency); - restore_join->setInitActiveBuildThreads(); - restore_join->initProbe(probe_sample_block, restore_join_build_concurrency); - for (Int64 i = 0; i < restore_join_build_concurrency; i++) - { - restore_build_streams[i] = std::make_shared(restore_build_streams[i], restore_join, i, log->identifier()); - } - auto build_stream = get_back_stream(restore_build_streams); - auto probe_stream = get_back_stream(restore_probe_streams); - if (restore_build_streams.empty()) - { - spilled_partition_indexes.pop_front(); - } - if (needScanHashMapAfterProbe(kind)) - { + if (spilled_partition_indexes.empty()) + { + return {}; + } + + // build new restore infos. + auto spilled_partition_index = spilled_partition_indexes.front(); + RUNTIME_CHECK_MSG(partitions[spilled_partition_index]->isSpill(), "should not restore unspilled partition."); + if (restore_join_build_concurrency <= 0) + restore_join_build_concurrency = getRestoreJoinBuildConcurrency(partitions.size(), spilled_partition_indexes.size(), join_restore_concurrency, probe_concurrency); + /// for restore join we make sure that the build concurrency is at least 2, so it can be spill again + assert(restore_join_build_concurrency >= 2); + LOG_INFO(log, "Begin restore data from disk for hash join, partition {}, restore round {}, build concurrency {}.", spilled_partition_index, restore_round, restore_join_build_concurrency); + auto restore_build_streams = build_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true); + RUNTIME_CHECK_MSG(restore_build_streams.size() == static_cast(restore_join_build_concurrency), "restore streams size must equal to restore_join_build_concurrency"); + auto restore_probe_streams = probe_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true); + auto new_max_bytes_before_external_join = static_cast(max_bytes_before_external_join * (static_cast(restore_join_build_concurrency) / build_concurrency)); + restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join)); + restore_join->initBuild(build_sample_block, restore_join_build_concurrency); + restore_join->setInitActiveBuildThreads(); + restore_join->initProbe(probe_sample_block, restore_join_build_concurrency); for (Int64 i = 0; i < restore_join_build_concurrency; i++) - restore_scan_hash_map_streams[i] = restore_join->createScanHashMapAfterProbeStream(probe_stream->getHeader(), i, restore_join_build_concurrency, max_block_size_); + { + restore_build_streams[i] = std::make_shared(restore_build_streams[i], restore_join, i, log->identifier()); + } + BlockInputStreams restore_scan_hash_map_streams; + restore_scan_hash_map_streams.resize(restore_join_build_concurrency, nullptr); + if (needScanHashMapAfterProbe(kind)) + { + auto header = restore_probe_streams.back()->getHeader(); + for (Int64 i = 0; i < restore_join_build_concurrency; i++) + restore_scan_hash_map_streams[i] = restore_join->createScanHashMapAfterProbeStream(header, i, restore_join_build_concurrency, max_block_size_); + } + for (Int64 i = 0; i < restore_join_build_concurrency; ++i) + { + restore_infos.emplace_back(restore_join, std::move(restore_scan_hash_map_streams[i]), std::move(restore_build_streams[i]), std::move(restore_probe_streams[i])); + } } - auto scan_hash_map_streams = get_back_stream(restore_scan_hash_map_streams); - return RestoreInfo{restore_join, std::move(scan_hash_map_streams), std::move(build_stream), std::move(probe_stream)}; } catch (...) { - restore_build_streams.clear(); - restore_probe_streams.clear(); - restore_scan_hash_map_streams.clear(); - auto err_message = getCurrentExceptionMessage(false, true); - meetErrorImpl(err_message, lock); - throw Exception(err_message); + restore_infos.clear(); + meetErrorImpl(getCurrentExceptionMessage(false, true), lock); + std::rethrow_exception(std::current_exception()); } } @@ -1920,7 +1908,7 @@ void Join::dispatchProbeBlock(Block & block, PartitionBlocks & partition_blocks_ } if (need_spill) { - probe_spiller->spillBlocks(std::move(blocks_to_spill), i); + spillProbeSideBlocks(i, std::move(blocks_to_spill)); } else { @@ -1933,7 +1921,7 @@ void Join::spillAllBuildPartitions() { for (size_t i = 0; i < partitions.size(); ++i) { - build_spiller->spillBlocks(partitions[i]->trySpillBuildPartition(true, build_spill_config.max_cached_data_bytes_in_spiller), i); + spillBuildSideBlocks(i, partitions[i]->trySpillBuildPartition(true, build_spill_config.max_cached_data_bytes_in_spiller)); } } @@ -1941,7 +1929,7 @@ void Join::spillAllProbePartitions() { for (size_t i = 0; i < partitions.size(); ++i) { - probe_spiller->spillBlocks(partitions[i]->trySpillProbePartition(true, probe_spill_config.max_cached_data_bytes_in_spiller), i); + spillProbeSideBlocks(i, partitions[i]->trySpillProbePartition(true, probe_spill_config.max_cached_data_bytes_in_spiller)); } } diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 79bdab56145..9d4ed8ec184 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -155,7 +155,7 @@ class Join const String & flag_mapped_entry_helper_name_ = "", size_t restore_round = 0, bool is_test = true, - const std::vector runtime_filter_list_ = dummy_runtime_filter_list); + const std::vector & runtime_filter_list_ = dummy_runtime_filter_list); size_t restore_round; @@ -263,6 +263,9 @@ class Join void meetError(const String & error_message); void meetErrorImpl(const String & error_message, std::unique_lock & lock); + void spillBuildSideBlocks(UInt64 part_id, Blocks && blocks); + void spillProbeSideBlocks(UInt64 part_id, Blocks && blocks); + static const String match_helper_prefix; static const DataTypePtr match_helper_type; static const String flag_mapped_entry_helper_prefix; @@ -274,9 +277,6 @@ class Join // used to name the column that records matched map entry before other conditions filter const String flag_mapped_entry_helper_name; - SpillerPtr build_spiller; - SpillerPtr probe_spiller; - private: friend class ScanHashMapAfterProbeBlockInputStream; @@ -333,9 +333,10 @@ class Join bool disable_spill = false; std::atomic peak_build_bytes_usage{0}; - BlockInputStreams restore_build_streams; - BlockInputStreams restore_probe_streams; - BlockInputStreams restore_scan_hash_map_streams; + SpillerPtr build_spiller; + SpillerPtr probe_spiller; + + std::vector restore_infos; Int64 restore_join_build_concurrency = -1; JoinPtr restore_join; @@ -355,7 +356,6 @@ class Join size_t right_rows_to_be_added_when_matched_for_cross_join = 0; size_t shallow_copy_cross_probe_threshold; -private: JoinMapMethod join_map_method = JoinMapMethod::EMPTY; Sizes key_sizes; @@ -387,6 +387,7 @@ class Join bool enable_fine_grained_shuffle = false; size_t fine_grained_shuffle_count = 0; +private: /** Set information about structure of right hand of JOIN (joined data). * You must call this method before subsequent calls to insertFromBlock. */ diff --git a/dbms/src/Operators/MergeSortTransformOp.cpp b/dbms/src/Operators/MergeSortTransformOp.cpp index d738c8ab03d..a176771845e 100644 --- a/dbms/src/Operators/MergeSortTransformOp.cpp +++ b/dbms/src/Operators/MergeSortTransformOp.cpp @@ -33,17 +33,7 @@ void MergeSortTransformOp::operatePrefixImpl() assert(!order_desc.empty()); if (max_bytes_before_external_sort > 0) - { - if (Spiller::supportSpill(header_without_constants)) - { - spiller = std::make_unique(spill_config, true, 1, header_without_constants, log); - } - else - { - max_bytes_before_external_sort = 0; - LOG_WARNING(log, "Sort/TopN does not support spill, reason: input data contains only constant columns"); - } - } + spiller = std::make_unique(spill_config, true, 1, header_without_constants, log); } void MergeSortTransformOp::operateSuffixImpl()