-
Notifications
You must be signed in to change notification settings - Fork 409
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
- Loading branch information
1 parent
ce24336
commit 7a29a25
Showing
5 changed files
with
171 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include <Core/QueryOperatorSpillContexts.h> | ||
|
||
namespace DB | ||
{ | ||
Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories) | ||
{ | ||
std::unique_lock lock(mutex, std::try_to_lock); | ||
/// use mutex to avoid concurrent check | ||
if (lock.owns_lock()) | ||
{ | ||
auto log_level = Poco::Message::PRIO_TRACE; | ||
bool check_cooldown_time = true; | ||
if unlikely (!first_check_done) | ||
{ | ||
first_check_done = true; | ||
check_cooldown_time = false; | ||
log_level = Poco::Message::PRIO_INFORMATION; | ||
} | ||
|
||
LOG_IMPL( | ||
log, | ||
log_level, | ||
"Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}", | ||
expected_released_memories); | ||
|
||
if (check_cooldown_time && watch.elapsedFromLastTime() < auto_spill_check_min_interval_ns) | ||
{ | ||
LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check"); | ||
return expected_released_memories; | ||
} | ||
|
||
auto ret = expected_released_memories; | ||
|
||
/// vector of <revocable_memories, task_operator_spill_contexts> | ||
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories; | ||
revocable_memories.reserve(task_operator_spill_contexts_list.size()); | ||
for (auto it = task_operator_spill_contexts_list.begin(); it != task_operator_spill_contexts_list.end();) | ||
{ | ||
if ((*it)->isFinished()) | ||
{ | ||
it = task_operator_spill_contexts_list.erase(it); | ||
} | ||
else | ||
{ | ||
revocable_memories.emplace_back((*it)->totalRevocableMemories(), (*it).get()); | ||
++it; | ||
} | ||
} | ||
std::sort(revocable_memories.begin(), revocable_memories.end(), [](const auto & a, const auto & b) { | ||
return a.first > b.first; | ||
}); | ||
for (auto & pair : revocable_memories) | ||
{ | ||
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD) | ||
break; | ||
ret = pair.second->triggerAutoSpill(ret); | ||
if (ret <= 0) | ||
break; | ||
} | ||
LOG_IMPL( | ||
log, | ||
log_level, | ||
"Auto spill check finished, marked {} memory to be spilled", | ||
expected_released_memories - ret); | ||
return ret; | ||
} | ||
return expected_released_memories; | ||
} | ||
} // namespace DB |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include <Core/TaskOperatorSpillContexts.h> | ||
|
||
namespace DB | ||
{ | ||
Int64 TaskOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories) | ||
{ | ||
if (isFinished()) | ||
return expected_released_memories; | ||
appendAdditionalOperatorSpillContexts(); | ||
bool has_finished_operator_spill_contexts = false; | ||
for (auto & operator_spill_context : operator_spill_contexts) | ||
{ | ||
assert(operator_spill_context->supportAutoTriggerSpill()); | ||
if (operator_spill_context->spillableStageFinished()) | ||
{ | ||
has_finished_operator_spill_contexts = true; | ||
continue; | ||
} | ||
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories); | ||
if (expected_released_memories <= 0) | ||
break; | ||
} | ||
if (has_finished_operator_spill_contexts) | ||
{ | ||
/// clean finished spill context | ||
operator_spill_contexts.erase( | ||
std::remove_if( | ||
operator_spill_contexts.begin(), | ||
operator_spill_contexts.end(), | ||
[](const auto & context) { return context->spillableStageFinished(); }), | ||
operator_spill_contexts.end()); | ||
} | ||
return expected_released_memories; | ||
} | ||
void TaskOperatorSpillContexts::appendAdditionalOperatorSpillContexts() | ||
{ | ||
if (has_additional_operator_spill_contexts) | ||
{ | ||
std::unique_lock lock(mutex); | ||
operator_spill_contexts.splice(operator_spill_contexts.end(), additional_operator_spill_contexts); | ||
has_additional_operator_spill_contexts = false; | ||
additional_operator_spill_contexts.clear(); | ||
} | ||
} | ||
void TaskOperatorSpillContexts::registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context) | ||
{ | ||
if (operator_spill_context->supportAutoTriggerSpill()) | ||
{ | ||
std::unique_lock lock(mutex); | ||
additional_operator_spill_contexts.push_back(operator_spill_context); | ||
has_additional_operator_spill_contexts = true; | ||
} | ||
} | ||
Int64 TaskOperatorSpillContexts::totalRevocableMemories() | ||
{ | ||
if unlikely (isFinished()) | ||
return 0; | ||
appendAdditionalOperatorSpillContexts(); | ||
Int64 ret = 0; | ||
for (const auto & operator_spill_context : operator_spill_contexts) | ||
ret += operator_spill_context->getTotalRevocableMemory(); | ||
return ret; | ||
} | ||
} // namespace DB |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters