Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some refine of auto spill check #7935

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Common/Stopwatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class Stopwatch
};

UInt64 elapsedMillisecondsFromLastTime() { return elapsedFromLastTime() / 1000000UL; }
UInt64 elapsedSecondsFromLastTime() { return elapsedFromLastTime() / 1000000UL; }
UInt64 elapsedSecondsFromLastTime() { return elapsedFromLastTime() / 1000000000UL; }

private:
UInt64 start_ns = 0;
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Core/OperatorSpillContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ namespace DB
{
enum class AutoSpillStatus
{
/// auto spill is not needed or current auto spill already finished
/// NO_NEED_AUTO_SPILL means auto spill is not needed or current auto spill already finished
NO_NEED_AUTO_SPILL,
/// auto spill is needed
/// WAIT_SPILL_FINISH means the operator is aware that it needs spill, but spill does not finish yet
WAIT_SPILL_FINISH,
/// NEED_AUTO_SPILL means to mark the operator to spill, the operator itself may not aware that it needs spill yet
NEED_AUTO_SPILL,
};

Expand All @@ -35,6 +37,7 @@ class OperatorSpillContext
std::atomic<bool> in_spillable_stage{true};
std::atomic<bool> is_spilled{false};
bool enable_spill = true;
bool auto_spill_mode = false;
String op_name;
LoggerPtr log;

Expand All @@ -45,6 +48,7 @@ class OperatorSpillContext
const static Int64 MIN_SPILL_THRESHOLD = 10ULL * 1024 * 1024;
OperatorSpillContext(UInt64 operator_spill_threshold_, const String op_name_, const LoggerPtr & log_)
: operator_spill_threshold(operator_spill_threshold_)
, auto_spill_mode(operator_spill_threshold == 0)
, op_name(op_name_)
, log(log_)
{}
Expand Down
83 changes: 83 additions & 0 deletions dbms/src/Core/QueryOperatorSpillContexts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/QueryOperatorSpillContexts.h>

namespace DB
{
Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories)
{
std::unique_lock lock(mutex, std::try_to_lock);
/// use mutex to avoid concurrent check
if (lock.owns_lock())
{
auto log_level = Poco::Message::PRIO_TRACE;
bool check_cooldown_time = true;
if unlikely (!first_check_done)
{
first_check_done = true;
check_cooldown_time = false;
log_level = Poco::Message::PRIO_INFORMATION;
}

LOG_IMPL(
log,
log_level,
"Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}",
expected_released_memories);

if (check_cooldown_time && watch.elapsedFromLastTime() < auto_spill_check_min_interval_ns)
{
LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check");
return expected_released_memories;
}

auto ret = expected_released_memories;

/// vector of <revocable_memories, task_operator_spill_contexts>
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories;
revocable_memories.reserve(task_operator_spill_contexts_list.size());
for (auto it = task_operator_spill_contexts_list.begin(); it != task_operator_spill_contexts_list.end();)
{
if ((*it)->isFinished())
{
it = task_operator_spill_contexts_list.erase(it);
}
else
{
revocable_memories.emplace_back((*it)->totalRevocableMemories(), (*it).get());
++it;
}
}
std::sort(revocable_memories.begin(), revocable_memories.end(), [](const auto & a, const auto & b) {
return a.first > b.first;
});
for (auto & pair : revocable_memories)
{
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD)
break;
ret = pair.second->triggerAutoSpill(ret);
if (ret <= 0)
break;
}
LOG_IMPL(
log,
log_level,
"Auto spill check finished, marked {} memory to be spilled",
expected_released_memories - ret);
return ret;
}
return expected_released_memories;
}
} // namespace DB
53 changes: 10 additions & 43 deletions dbms/src/Core/QueryOperatorSpillContexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Stopwatch.h>
#include <Core/TaskOperatorSpillContexts.h>
#include <Flash/Mpp/MPPTaskId.h>

Expand All @@ -22,51 +23,15 @@ namespace DB
class QueryOperatorSpillContexts
{
public:
explicit QueryOperatorSpillContexts(const MPPQueryId & query_id)
: log(Logger::get(query_id.toString()))
{}
Int64 triggerAutoSpill(Int64 expected_released_memories)
QueryOperatorSpillContexts(const MPPQueryId & query_id, UInt64 auto_spill_check_min_interval_ms)
: auto_spill_check_min_interval_ns(auto_spill_check_min_interval_ms * 1000000ULL)
, log(Logger::get(query_id.toString()))
{
std::unique_lock lock(mutex, std::try_to_lock);
/// use mutex to avoid concurrent check, todo maybe need add minimum check interval(like 100ms) here?
if (lock.owns_lock())
{
if unlikely (!first_check)
{
first_check = true;
LOG_INFO(log, "Query memory usage exceeded threshold, trigger auto spill check");
}
/// vector of <revocable_memories, task_operator_spill_contexts>
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories;
revocable_memories.reserve(task_operator_spill_contexts_list.size());
for (auto it = task_operator_spill_contexts_list.begin(); it != task_operator_spill_contexts_list.end();)
{
if ((*it)->isFinished())
{
it = task_operator_spill_contexts_list.erase(it);
}
else
{
revocable_memories.emplace_back((*it)->totalRevocableMemories(), (*it).get());
++it;
}
}
std::sort(revocable_memories.begin(), revocable_memories.end(), [](const auto & a, const auto & b) {
return a.first > b.first;
});
for (auto & pair : revocable_memories)
{
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD)
break;
expected_released_memories = pair.second->triggerAutoSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
return expected_released_memories;
}
return expected_released_memories;
watch.start();
}

Int64 triggerAutoSpill(Int64 expected_released_memories);

void registerTaskOperatorSpillContexts(
const std::shared_ptr<TaskOperatorSpillContexts> & task_operator_spill_contexts)
{
Expand All @@ -84,9 +49,11 @@ class QueryOperatorSpillContexts

private:
std::list<std::shared_ptr<TaskOperatorSpillContexts>> task_operator_spill_contexts_list;
bool first_check = false;
bool first_check_done = false;
const UInt64 auto_spill_check_min_interval_ns;
LoggerPtr log;
mutable std::mutex mutex;
Stopwatch watch;
};

} // namespace DB
78 changes: 78 additions & 0 deletions dbms/src/Core/TaskOperatorSpillContexts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/TaskOperatorSpillContexts.h>

namespace DB
{
Int64 TaskOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories)
{
if (isFinished())
return expected_released_memories;
appendAdditionalOperatorSpillContexts();
bool has_finished_operator_spill_contexts = false;
for (auto & operator_spill_context : operator_spill_contexts)
{
assert(operator_spill_context->supportAutoTriggerSpill());
if (operator_spill_context->spillableStageFinished())
{
has_finished_operator_spill_contexts = true;
continue;
}
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
if (has_finished_operator_spill_contexts)
{
/// clean finished spill context
operator_spill_contexts.erase(
std::remove_if(
operator_spill_contexts.begin(),
operator_spill_contexts.end(),
[](const auto & context) { return context->spillableStageFinished(); }),
operator_spill_contexts.end());
}
return expected_released_memories;
}
void TaskOperatorSpillContexts::appendAdditionalOperatorSpillContexts()
{
if (has_additional_operator_spill_contexts)
{
std::unique_lock lock(mutex);
operator_spill_contexts.splice(operator_spill_contexts.end(), additional_operator_spill_contexts);
has_additional_operator_spill_contexts = false;
additional_operator_spill_contexts.clear();
}
}
void TaskOperatorSpillContexts::registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
if (operator_spill_context->supportAutoTriggerSpill())
{
std::unique_lock lock(mutex);
additional_operator_spill_contexts.push_back(operator_spill_context);
has_additional_operator_spill_contexts = true;
}
}
Int64 TaskOperatorSpillContexts::totalRevocableMemories()
{
if unlikely (isFinished())
return 0;
appendAdditionalOperatorSpillContexts();
Int64 ret = 0;
for (const auto & operator_spill_context : operator_spill_contexts)
ret += operator_spill_context->getTotalRevocableMemory();
return ret;
}
} // namespace DB
64 changes: 5 additions & 59 deletions dbms/src/Core/TaskOperatorSpillContexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,9 @@ namespace DB
class TaskOperatorSpillContexts
{
public:
Int64 triggerAutoSpill(Int64 expected_released_memories)
{
if (isFinished())
return expected_released_memories;
appendAdditionalOperatorSpillContexts();
bool has_finished_operator_spill_contexts = false;
for (auto & operator_spill_context : operator_spill_contexts)
{
assert(operator_spill_context->supportAutoTriggerSpill());
if (operator_spill_context->spillableStageFinished())
{
has_finished_operator_spill_contexts = true;
continue;
}
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
if (has_finished_operator_spill_contexts)
{
/// clean finished spill context
operator_spill_contexts.erase(
std::remove_if(
operator_spill_contexts.begin(),
operator_spill_contexts.end(),
[](const auto & context) { return context->spillableStageFinished(); }),
operator_spill_contexts.end());
}
return expected_released_memories;
}
void registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
if (operator_spill_context->supportAutoTriggerSpill())
{
std::unique_lock lock(mutex);
additional_operator_spill_contexts.push_back(operator_spill_context);
has_additional_operator_spill_contexts = true;
}
}
Int64 triggerAutoSpill(Int64 expected_released_memories);

void registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context);
/// for tests
size_t operatorSpillContextCount()
{
Expand All @@ -73,32 +37,14 @@ class TaskOperatorSpillContexts
return additional_operator_spill_contexts.size();
}

Int64 totalRevocableMemories()
{
if unlikely (isFinished())
return 0;
appendAdditionalOperatorSpillContexts();
Int64 ret = 0;
for (const auto & operator_spill_context : operator_spill_contexts)
ret += operator_spill_context->getTotalRevocableMemory();
return ret;
}
Int64 totalRevocableMemories();

bool isFinished() const { return is_task_finished; }

void finish() { is_task_finished = true; }

private:
void appendAdditionalOperatorSpillContexts()
{
if (has_additional_operator_spill_contexts)
{
std::unique_lock lock(mutex);
operator_spill_contexts.splice(operator_spill_contexts.end(), additional_operator_spill_contexts);
has_additional_operator_spill_contexts = false;
additional_operator_spill_contexts.clear();
}
}
void appendAdditionalOperatorSpillContexts();
/// access to operator_spill_contexts is thread safe
std::list<OperatorSpillContextPtr> operator_spill_contexts;
mutable std::mutex mutex;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void EstablishCallData::initRpc()
void EstablishCallData::tryConnectTunnel()
{
auto * task_manager = service->getContext()->getTMTContext().getMPPTaskManager().get();
auto [tunnel, err_msg] = task_manager->findAsyncTunnel(&request, this, cq);
auto [tunnel, err_msg] = task_manager->findAsyncTunnel(&request, this, cq, *service->getContext());
if (tunnel == nullptr && err_msg.empty())
{
/// Call data will be put to cq by alarm, just return is ok
Expand Down
Loading