Skip to content

Commit

Permalink
.*: Pre refine for supporting join spill in pipeline (#7742)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Jul 7, 2023
1 parent e98f5d6 commit accc4f4
Show file tree
Hide file tree
Showing 29 changed files with 523 additions and 241 deletions.
1 change: 0 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ namespace DB
F(type_cpu, {{"type", "cpu"}}, ExpBuckets{0.005, 2, 20}), \
F(type_io, {{"type", "io"}}, ExpBuckets{0.005, 2, 20})) \
M(tiflash_pipeline_task_change_to_status, "pipeline task change to status", Counter, \
F(type_to_init, {"type", "to_init"}), \
F(type_to_waiting, {"type", "to_waiting"}), \
F(type_to_running, {"type", "to_running"}), \
F(type_to_io, {"type", "to_io"}), \
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Core/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ bool needReplace(char c)
return std::isspace(c) || String::npos != forbidden_or_unusual_chars.find(c);
}
} // namespace
SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_)
SpillConfig::SpillConfig(
const DB::String & spill_dir_,
const DB::String & spill_id_,
size_t max_cached_data_bytes_in_spiller_,
size_t max_spilled_rows_per_file_,
size_t max_spilled_bytes_per_file_,
const FileProviderPtr & file_provider_,
UInt64 for_all_constant_max_streams_,
UInt64 for_all_constant_block_size_)
: spill_dir(spill_dir_)
, spill_id(spill_id_)
, spill_id_as_file_name_prefix(spill_id)
, max_cached_data_bytes_in_spiller(max_cached_data_bytes_in_spiller_)
, max_spilled_rows_per_file(max_spilled_rows_per_file_)
, max_spilled_bytes_per_file(max_spilled_bytes_per_file_)
, file_provider(file_provider_)
, for_all_constant_max_streams(std::max(1, for_all_constant_max_streams_))
, for_all_constant_block_size(std::max(1, for_all_constant_block_size_))
{
RUNTIME_CHECK_MSG(!spill_dir.empty(), "Spiller dir must be non-empty");
RUNTIME_CHECK_MSG(!spill_id.empty(), "Spiller id must be non-empty");
Expand Down
14 changes: 13 additions & 1 deletion dbms/src/Core/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Core/Defines.h>
#include <Encryption/FileProvider_fwd.h>
#include <common/types.h>

Expand All @@ -23,7 +24,15 @@ namespace DB
struct SpillConfig
{
public:
SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_);
SpillConfig(
const String & spill_dir_,
const String & spill_id_,
size_t max_cached_data_bytes_in_spiller_,
size_t max_spilled_rows_per_file_,
size_t max_spilled_bytes_per_file_,
const FileProviderPtr & file_provider_,
UInt64 for_all_constant_max_streams_ = 1,
UInt64 for_all_constant_block_size_ = DEFAULT_BLOCK_SIZE);
String spill_dir;
String spill_id;
String spill_id_as_file_name_prefix;
Expand All @@ -34,5 +43,8 @@ struct SpillConfig
/// soft limit of the max bytes per spilled file
UInt64 max_spilled_bytes_per_file;
FileProviderPtr file_provider;

UInt64 for_all_constant_max_streams;
UInt64 for_all_constant_block_size;
};
} // namespace DB
86 changes: 55 additions & 31 deletions dbms/src/Core/SpillHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,49 +79,64 @@ void SpillHandler::spillBlocks(Blocks && blocks)
/// todo check the disk usage
if (unlikely(blocks.empty()))
return;

RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);

try
{
Stopwatch watch;
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
auto block_size = blocks.size();
LOG_DEBUG(spiller->logger, "Spilling {} blocks data", block_size);

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_spill);

size_t total_rows = 0;
size_t rows_in_file = 0;
size_t bytes_in_file = 0;
for (auto & block : blocks)
if unlikely (spiller->isAllConstant())
{
if (unlikely(!block || block.rows() == 0))
continue;
/// erase constant column
spiller->removeConstantColumns(block);
RUNTIME_CHECK_MSG(block.columns() > 0, "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
if (unlikely(writer == nullptr))
LOG_WARNING(spiller->logger, "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns");
for (auto & block : blocks)
{
std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile();
if (unlikely(!block || block.rows() == 0))
continue;
all_constant_block_rows += block.rows();
}
auto rows = block.rows();
total_rows += rows;
rows_in_file += rows;
bytes_in_file += block.estimateBytesForSpill();
writer->write(block);
block.clear();
if (spiller->enable_append_write && isSpilledFileFull(rows_in_file, bytes_in_file))
}
else
{
Stopwatch watch;
auto block_size = blocks.size();
LOG_DEBUG(spiller->logger, "Spilling {} blocks data", block_size);

size_t total_rows = 0;
size_t rows_in_file = 0;
size_t bytes_in_file = 0;
for (auto & block : blocks)
{
spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite());
spilled_files[current_spilled_file_index]->markFull();
writer = nullptr;
if (unlikely(!block || block.rows() == 0))
continue;
/// erase constant column
spiller->removeConstantColumns(block);
RUNTIME_CHECK(block.columns() > 0);
if (unlikely(writer == nullptr))
{
std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile();
}
auto rows = block.rows();
total_rows += rows;
rows_in_file += rows;
bytes_in_file += block.estimateBytesForSpill();
writer->write(block);
block.clear();
if (spiller->enable_append_write && isSpilledFileFull(rows_in_file, bytes_in_file))
{
spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite());
spilled_files[current_spilled_file_index]->markFull();
writer = nullptr;
}
}
double cost = watch.elapsedSeconds();
time_cost += cost;
LOG_DEBUG(spiller->logger, "Spilled {} rows from {} blocks into temporary file, time cost: {:.3f} sec.", total_rows, block_size, cost);
RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler is finished.", spiller->config.spill_id);
}
double cost = watch.elapsedSeconds();
time_cost += cost;
LOG_DEBUG(spiller->logger, "Spilled {} rows from {} blocks into temporary file, time cost: {:.3f} sec.", total_rows, block_size, cost);
RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler is finished.", spiller->config.spill_id);

RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
return;
}
catch (...)
{
Expand Down Expand Up @@ -174,5 +189,14 @@ void SpillHandler::finish()
current_spilled_file_index = INVALID_CURRENT_SPILLED_FILE_INDEX;
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
}
else if unlikely (spiller->isAllConstant())
{
if (all_constant_block_rows > 0)
{
spiller->recordAllConstantBlockRows(partition_id, all_constant_block_rows);
spiller->has_spilled_data = true;
all_constant_block_rows = 0;
}
}
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Core/SpillHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SpillHandler
};
Spiller * spiller;
std::vector<std::unique_ptr<SpilledFile>> spilled_files;
UInt64 all_constant_block_rows = 0;
size_t partition_id;
Int64 current_spilled_file_index;
String current_spill_file_name;
Expand Down
Loading

0 comments on commit accc4f4

Please sign in to comment.