Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed Aug 18, 2023
1 parent 87c0614 commit dbaf4b4
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 114 deletions.
83 changes: 83 additions & 0 deletions dbms/src/Core/QueryOperatorSpillContexts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/QueryOperatorSpillContexts.h>

namespace DB
{
Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories)
{
std::unique_lock lock(mutex, std::try_to_lock);
/// use mutex to avoid concurrent check
if (lock.owns_lock())
{
auto log_level = Poco::Message::PRIO_TRACE;
bool check_cooldown_time = true;
if unlikely (!first_check_done)
{
first_check_done = true;
check_cooldown_time = false;
log_level = Poco::Message::PRIO_INFORMATION;
}

LOG_IMPL(
log,
log_level,
"Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}",
expected_released_memories);

if (check_cooldown_time && watch.elapsedFromLastTime() < auto_spill_check_min_interval_ns)
{
LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check");
return expected_released_memories;
}

auto ret = expected_released_memories;

/// vector of <revocable_memories, task_operator_spill_contexts>
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories;
revocable_memories.reserve(task_operator_spill_contexts_list.size());
for (auto it = task_operator_spill_contexts_list.begin(); it != task_operator_spill_contexts_list.end();)
{
if ((*it)->isFinished())
{
it = task_operator_spill_contexts_list.erase(it);
}
else
{
revocable_memories.emplace_back((*it)->totalRevocableMemories(), (*it).get());
++it;
}
}
std::sort(revocable_memories.begin(), revocable_memories.end(), [](const auto & a, const auto & b) {
return a.first > b.first;
});
for (auto & pair : revocable_memories)
{
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD)
break;
ret = pair.second->triggerAutoSpill(ret);
if (ret <= 0)
break;
}
LOG_IMPL(
log,
log_level,
"Auto spill check finished, marked {} memory to be spilled",
expected_released_memories - ret);
return ret;
}
return expected_released_memories;
}
} // namespace DB
56 changes: 2 additions & 54 deletions dbms/src/Core/QueryOperatorSpillContexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,60 +29,8 @@ class QueryOperatorSpillContexts
{
watch.start();
}
Int64 triggerAutoSpill(Int64 expected_released_memories)
{
std::unique_lock lock(mutex, std::try_to_lock);
/// use mutex to avoid concurrent check
if (lock.owns_lock())
{
auto log_level = Poco::Message::PRIO_TRACE;
if unlikely (!first_check)
{
first_check = true;
log_level = Poco::Message::PRIO_INFORMATION;
}

LOG_IMPL(log, log_level, "Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}", expected_released_memories);

if (watch.elapsedFromLastTime() < auto_spill_check_min_interval_ns)
{
LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check");
return expected_released_memories;
}

auto ret = expected_released_memories;

/// vector of <revocable_memories, task_operator_spill_contexts>
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories;
revocable_memories.reserve(task_operator_spill_contexts_list.size());
for (auto it = task_operator_spill_contexts_list.begin(); it != task_operator_spill_contexts_list.end();)
{
if ((*it)->isFinished())
{
it = task_operator_spill_contexts_list.erase(it);
}
else
{
revocable_memories.emplace_back((*it)->totalRevocableMemories(), (*it).get());
++it;
}
}
std::sort(revocable_memories.begin(), revocable_memories.end(), [](const auto & a, const auto & b) {
return a.first > b.first;
});
for (auto & pair : revocable_memories)
{
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD)
break;
ret = pair.second->triggerAutoSpill(ret);
if (ret <= 0)
break;
}
LOG_IMPL(log, log_level, "Auto spill check finished, marked {} memory to be spilled", expected_released_memories - ret);
return ret;
}
return expected_released_memories;
}
Int64 triggerAutoSpill(Int64 expected_released_memories);

void registerTaskOperatorSpillContexts(
const std::shared_ptr<TaskOperatorSpillContexts> & task_operator_spill_contexts)
Expand All @@ -101,7 +49,7 @@ class QueryOperatorSpillContexts

private:
std::list<std::shared_ptr<TaskOperatorSpillContexts>> task_operator_spill_contexts_list;
bool first_check = false;
bool first_check_done = false;
const UInt64 auto_spill_check_min_interval_ns;
LoggerPtr log;
mutable std::mutex mutex;
Expand Down
78 changes: 78 additions & 0 deletions dbms/src/Core/TaskOperatorSpillContexts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/TaskOperatorSpillContexts.h>

namespace DB
{
Int64 TaskOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories)
{
if (isFinished())
return expected_released_memories;
appendAdditionalOperatorSpillContexts();
bool has_finished_operator_spill_contexts = false;
for (auto & operator_spill_context : operator_spill_contexts)
{
assert(operator_spill_context->supportAutoTriggerSpill());
if (operator_spill_context->spillableStageFinished())
{
has_finished_operator_spill_contexts = true;
continue;
}
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
if (has_finished_operator_spill_contexts)
{
/// clean finished spill context
operator_spill_contexts.erase(
std::remove_if(
operator_spill_contexts.begin(),
operator_spill_contexts.end(),
[](const auto & context) { return context->spillableStageFinished(); }),
operator_spill_contexts.end());
}
return expected_released_memories;
}
void TaskOperatorSpillContexts::appendAdditionalOperatorSpillContexts()
{
if (has_additional_operator_spill_contexts)
{
std::unique_lock lock(mutex);
operator_spill_contexts.splice(operator_spill_contexts.end(), additional_operator_spill_contexts);
has_additional_operator_spill_contexts = false;
additional_operator_spill_contexts.clear();
}
}
void TaskOperatorSpillContexts::registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
if (operator_spill_context->supportAutoTriggerSpill())
{
std::unique_lock lock(mutex);
additional_operator_spill_contexts.push_back(operator_spill_context);
has_additional_operator_spill_contexts = true;
}
}
Int64 TaskOperatorSpillContexts::totalRevocableMemories()
{
if unlikely (isFinished())
return 0;
appendAdditionalOperatorSpillContexts();
Int64 ret = 0;
for (const auto & operator_spill_context : operator_spill_contexts)
ret += operator_spill_context->getTotalRevocableMemory();
return ret;
}
} // namespace DB
64 changes: 5 additions & 59 deletions dbms/src/Core/TaskOperatorSpillContexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,9 @@ namespace DB
class TaskOperatorSpillContexts
{
public:
Int64 triggerAutoSpill(Int64 expected_released_memories)
{
if (isFinished())
return expected_released_memories;
appendAdditionalOperatorSpillContexts();
bool has_finished_operator_spill_contexts = false;
for (auto & operator_spill_context : operator_spill_contexts)
{
assert(operator_spill_context->supportAutoTriggerSpill());
if (operator_spill_context->spillableStageFinished())
{
has_finished_operator_spill_contexts = true;
continue;
}
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
if (has_finished_operator_spill_contexts)
{
/// clean finished spill context
operator_spill_contexts.erase(
std::remove_if(
operator_spill_contexts.begin(),
operator_spill_contexts.end(),
[](const auto & context) { return context->spillableStageFinished(); }),
operator_spill_contexts.end());
}
return expected_released_memories;
}
void registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
if (operator_spill_context->supportAutoTriggerSpill())
{
std::unique_lock lock(mutex);
additional_operator_spill_contexts.push_back(operator_spill_context);
has_additional_operator_spill_contexts = true;
}
}
Int64 triggerAutoSpill(Int64 expected_released_memories);

void registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context);
/// for tests
size_t operatorSpillContextCount()
{
Expand All @@ -73,32 +37,14 @@ class TaskOperatorSpillContexts
return additional_operator_spill_contexts.size();
}

Int64 totalRevocableMemories()
{
if unlikely (isFinished())
return 0;
appendAdditionalOperatorSpillContexts();
Int64 ret = 0;
for (const auto & operator_spill_context : operator_spill_contexts)
ret += operator_spill_context->getTotalRevocableMemory();
return ret;
}
Int64 totalRevocableMemories();

bool isFinished() const { return is_task_finished; }

void finish() { is_task_finished = true; }

private:
void appendAdditionalOperatorSpillContexts()
{
if (has_additional_operator_spill_contexts)
{
std::unique_lock lock(mutex);
operator_spill_contexts.splice(operator_spill_contexts.end(), additional_operator_spill_contexts);
has_additional_operator_spill_contexts = false;
additional_operator_spill_contexts.clear();
}
}
void appendAdditionalOperatorSpillContexts();
/// access to operator_spill_contexts is thread safe
std::list<OperatorSpillContextPtr> operator_spill_contexts;
mutable std::mutex mutex;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/AggSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thr
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
if (per_thread_auto_spill_status[thread_num].compare_exchange_strong(old_value, AutoSpillStatus::WAIT_SPILL_FINISH))
if (per_thread_auto_spill_status[thread_num].compare_exchange_strong(
old_value,
AutoSpillStatus::WAIT_SPILL_FINISH))
/// in auto spill mode, don't set revocable_memory to 0 here, so in triggerSpill it will take
/// the revocable_memory into account if current spill is on the way
return true;
Expand Down

0 comments on commit dbaf4b4

Please sign in to comment.