Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed Aug 21, 2023
1 parent 2d6bb0e commit 3333aba
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 35 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/Stopwatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class Stopwatch
};

UInt64 elapsedMillisecondsFromLastTime() { return elapsedFromLastTime() / 1000000UL; }
UInt64 elapsedSecondsFromLastTime() { return elapsedFromLastTime() / 1000000UL; }
UInt64 elapsedSecondsFromLastTime() { return elapsedFromLastTime() / 1000000000UL; }

private:
UInt64 start_ns = 0;
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Core/OperatorSpillContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ namespace DB
{
enum class AutoSpillStatus
{
/// auto spill is not needed or current auto spill already finished
/// NO_NEED_AUTO_SPILL means auto spill is not needed or current auto spill already finished
NO_NEED_AUTO_SPILL,
/// auto spill is needed
/// WAI_SPILL_FINISH means the operator is aware that it needs spill, but spill does not finish yet
WAIT_SPILL_FINISH,
/// NEED_AUTO_SPILL means to mark the operator to spill, the operator itself may not aware that it needs spill yet
NEED_AUTO_SPILL,
};

Expand All @@ -35,6 +37,7 @@ class OperatorSpillContext
std::atomic<bool> in_spillable_stage{true};
std::atomic<bool> is_spilled{false};
bool enable_spill = true;
bool auto_spill_mode = false;
String op_name;
LoggerPtr log;

Expand All @@ -47,7 +50,9 @@ class OperatorSpillContext
: operator_spill_threshold(operator_spill_threshold_)
, op_name(op_name_)
, log(log_)
{}
{
auto_spill_mode = supportAutoTriggerSpill() && operator_spill_threshold == 0;
}
virtual ~OperatorSpillContext() = default;
bool isSpillEnabled() const { return enable_spill && (supportAutoTriggerSpill() || operator_spill_threshold > 0); }
void disableSpill() { enable_spill = false; }
Expand Down
31 changes: 20 additions & 11 deletions dbms/src/Core/QueryOperatorSpillContexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ namespace DB
class QueryOperatorSpillContexts
{
public:
QueryOperatorSpillContexts(const MPPQueryId & query_id, UInt64 auto_spill_check_min_interval_ms_)
: auto_spill_check_min_interval_ms(
auto_spill_check_min_interval_ms_ == 0 ? std::numeric_limits<UInt64>::max()
: auto_spill_check_min_interval_ms_)
QueryOperatorSpillContexts(const MPPQueryId & query_id, UInt64 auto_spill_check_min_interval_ms)
: auto_spill_check_min_interval_ns(
auto_spill_check_min_interval_ms == 0 ? std::numeric_limits<UInt64>::max()
: auto_spill_check_min_interval_ms * 1000000ULL)
, log(Logger::get(query_id.toString()))
{
watch.start();
Expand All @@ -37,16 +37,24 @@ class QueryOperatorSpillContexts
/// use mutex to avoid concurrent check
if (lock.owns_lock())
{
auto log_level = Poco::Message::PRIO_TRACE;
if unlikely (!first_check)
{
first_check = true;
log_level = Poco::Message::PRIO_INFORMATION;
LOG_INFO(log, "Query memory usage exceeded threshold, trigger auto spill check");
}
else

LOG_IMPL(log, log_level, "Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}", expected_released_memories);

if (watch.elapsedFromLastTime() < auto_spill_check_min_interval_ns)
{
if (watch.elapsedMillisecondsFromLastTime() < auto_spill_check_min_interval_ms)
return expected_released_memories;
LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check");
return expected_released_memories;
}

auto ret = expected_released_memories;

/// vector of <revocable_memories, task_operator_spill_contexts>
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories;
revocable_memories.reserve(task_operator_spill_contexts_list.size());
Expand All @@ -69,11 +77,12 @@ class QueryOperatorSpillContexts
{
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD)
break;
expected_released_memories = pair.second->triggerAutoSpill(expected_released_memories);
if (expected_released_memories <= 0)
ret = pair.second->triggerAutoSpill(ret);
if (ret <= 0)
break;
}
return expected_released_memories;
LOG_IMPL(log, log_level, "Auto spill check finished, marked {} memory to be spilled", expected_released_memories - ret);
return ret;
}
return expected_released_memories;
}
Expand All @@ -96,7 +105,7 @@ class QueryOperatorSpillContexts
private:
std::list<std::shared_ptr<TaskOperatorSpillContexts>> task_operator_spill_contexts_list;
bool first_check = false;
const UInt64 auto_spill_check_min_interval_ms;
const UInt64 auto_spill_check_min_interval_ns;
LoggerPtr log;
mutable std::mutex mutex;
Stopwatch watch;
Expand Down
31 changes: 29 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,32 @@ void updateSettingsFromTiDB(const grpc::ServerContext * grpc_context, ContextPtr
}
}
}

void updateSettingsForAutoSpill(ContextPtr & context, const LoggerPtr & log)
{
if (context->getSettingsRef().max_memory_usage.getActualBytes(1024 * 1024 * 1024ULL) > 0)
{
/// auto spill is set, disable operator spill threshold
bool need_log_warning = false;
if (context->getSettingsRef().max_bytes_before_external_sort > 0)
{
need_log_warning = true;
context->setSetting("max_bytes_before_external_sort", "0");
}
if (context->getSettingsRef().max_bytes_before_external_group_by > 0)
{
need_log_warning = true;
context->setSetting("max_bytes_before_external_group_by", "0");
}
if (context->getSettingsRef().max_bytes_before_external_join > 0)
{
need_log_warning = true;
context->setSetting("max_bytes_before_external_join", "0");
}
if (need_log_warning)
LOG_WARNING(log, "auto spill is enabled, so per operator's memory threshold is disabled");
}
}
} // namespace

grpc::Status FlashService::Coprocessor(
Expand Down Expand Up @@ -683,7 +709,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// CancelMPPTask cancels the query of the task.
LOG_INFO(log, "cancel mpp task request: {}", request->DebugString());
auto [context, status] = createDBContextForTest();
auto [test_context, status] = createDBContextForTest();
if (!status.ok())
{
auto err = std::make_unique<mpp::Error>();
Expand All @@ -692,7 +718,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(
response->set_allocated_error(err.release());
return status;
}
auto & tmt_context = context->getTMTContext();
auto & tmt_context = test_context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPGather(
MPPGatherId(request->meta()),
Expand Down Expand Up @@ -756,6 +782,7 @@ std::tuple<ContextPtr, grpc::Status> FlashService::createDBContext(const grpc::S
}

updateSettingsFromTiDB(grpc_context, tmp_context, log);
updateSettingsForAutoSpill(tmp_context, log);

tmp_context->setSetting("enable_async_server", is_async ? "true" : "false");
tmp_context->setSetting("enable_local_tunnel", enable_local_tunnel ? "true" : "false");
Expand Down
27 changes: 18 additions & 9 deletions dbms/src/Interpreters/AggSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@ bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thr
if (!in_spillable_stage || !enable_spill)
return false;
per_thread_revocable_memories[thread_num] = new_value;
if (per_thread_auto_spill_status[thread_num] == AutoSpillStatus::NEED_AUTO_SPILL
|| (per_thread_spill_threshold > 0 && new_value > static_cast<Int64>(per_thread_spill_threshold)))
if (auto_spill_mode)
{
per_thread_revocable_memories[thread_num] = 0;
return true;
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[thread_num].compare_exchange_strong(old_value, AutoSpillStatus::WAIT_SPILL_FINISH))
/// in auto spill mode, don't set revocable_memory to 0 here, so in triggerSpill it will take
/// the revocable_memory into account if current spill is on the way
return true;
}
else
{
if (per_thread_spill_threshold > 0 && new_value > static_cast<Int64>(per_thread_spill_threshold))
{
per_thread_revocable_memories[thread_num] = 0;
return true;
}
}
return false;
}
Expand All @@ -67,17 +77,15 @@ Int64 AggSpillContext::triggerSpill(Int64 expected_released_memories)
{
if (!in_spillable_stage || !enable_spill)
return expected_released_memories;
RUNTIME_CHECK_MSG(operator_spill_threshold == 0, "The operator spill threshold should be 0 in auto spill mode");
auto total_revocable_memory = getTotalRevocableMemory();
if (total_revocable_memory >= MIN_SPILL_THRESHOLD)
{
for (size_t i = 0; i < per_thread_revocable_memories.size() && expected_released_memories > 0; i++)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[i].compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL))
{
expected_released_memories = std::max(expected_released_memories - per_thread_revocable_memories[i], 0);
per_thread_revocable_memories[i] = 0;
}
per_thread_auto_spill_status[i].compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - per_thread_revocable_memories[i], 0);
}
}
return expected_released_memories;
Expand All @@ -86,5 +94,6 @@ Int64 AggSpillContext::triggerSpill(Int64 expected_released_memories)
void AggSpillContext::finishOneSpill(size_t thread_num)
{
per_thread_auto_spill_status[thread_num] = AutoSpillStatus::NO_NEED_AUTO_SPILL;
per_thread_revocable_memories[thread_num] = 0;
}
} // namespace DB
27 changes: 18 additions & 9 deletions dbms/src/Interpreters/SortSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,43 @@ bool SortSpillContext::updateRevocableMemory(Int64 new_value)
if (!in_spillable_stage || !enable_spill)
return false;
revocable_memory = new_value;
if (auto_spill_status == AutoSpillStatus::NEED_AUTO_SPILL
|| (operator_spill_threshold > 0 && revocable_memory > static_cast<Int64>(operator_spill_threshold)))
if (auto_spill_mode)
{
revocable_memory = 0;
return true;
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
if (auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::WAIT_SPILL_FINISH))
/// in auto spill mode, don't set revocable_memory to 0 here, so in triggerSpill it will take
/// the revocable_memory into account if current spill is on the way
return true;
}
else
{
if (operator_spill_threshold > 0 && revocable_memory > static_cast<Int64>(operator_spill_threshold))
{
revocable_memory = 0;
return true;
}
}
return false;
}

Int64 SortSpillContext::triggerSpill(Int64 expected_released_memories)
{
RUNTIME_CHECK_MSG(operator_spill_threshold == 0, "The operator spill threshold should be 0 in auto spill mode");
if (!in_spillable_stage || !enable_spill)
return expected_released_memories;
auto total_revocable_memory = getTotalRevocableMemory();
if (total_revocable_memory >= MIN_SPILL_THRESHOLD)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL))
{
expected_released_memories = std::max(expected_released_memories - total_revocable_memory, 0);
revocable_memory = 0;
}
auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - revocable_memory, 0);
}
return expected_released_memories;
}

void SortSpillContext::finishOneSpill()
{
auto_spill_status = AutoSpillStatus::NO_NEED_AUTO_SPILL;
revocable_memory = 0;
}
} // namespace DB

0 comments on commit 3333aba

Please sign in to comment.