Skip to content

Commit

Permalink
Merge branch 'master' into bugfix/3475
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 28, 2021
2 parents 183d9d1 + 6b24135 commit 790f737
Show file tree
Hide file tree
Showing 57 changed files with 1,287 additions and 349 deletions.
2 changes: 1 addition & 1 deletion .ci/build.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ catchError {

def NPROC = 5

util.runWithTiCSFull("build-tics", CURWS) {
util.runCheckoutAndBuilderClosure("build-tics-v1", CURWS) {
dir("${CURWS}/tics") {
stage("Build & Upload") {
timeout(time: 70, unit: 'MINUTES') {
Expand Down
18 changes: 10 additions & 8 deletions .ci/integration_test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ catchError {
stash includes: "tics/**", name: "git-code-tics", useDefaultExcludes: false
}

def pod_label = "tics-integration-test-v1"

parallel (
"tidb ci test": {
def label = "tidb-ci-test"
util.runTest(label, "tics/tests/tidb-ci", tidbBranch)
def name = "tidb-ci-test"
util.runTest(pod_label, name, "tics/tests/tidb-ci", tidbBranch)
},
"delta merge test": {
def label = "delta-merge-test"
util.runTest(label, "tics/tests/delta-merge-test", tidbBranch)
def name = "delta-merge-test"
util.runTest(pod_label, name, "tics/tests/delta-merge-test", tidbBranch)
},
"fullstack test": {
def label = "fullstack-test"
util.runTest(label, "tics/tests/fullstack-test", tidbBranch)
def name = "fullstack-test"
util.runTest(pod_label, name, "tics/tests/fullstack-test", tidbBranch)
},
"fullstack test2": {
def label = "fullstack-test2"
util.runTest(label, "tics/tests/fullstack-test2", tidbBranch)
def name = "fullstack-test2"
util.runTest(pod_label, name, "tics/tests/fullstack-test2", tidbBranch)
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion .ci/unit_test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ catchError {

def NPROC = 5

util.runUTCoverTICS(CURWS, NPROC)
util.runUnitTests("ut-tics-v1", CURWS, NPROC)

}

Expand Down
38 changes: 26 additions & 12 deletions .ci/util.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ def checkoutTiCSFull(commit, pullId) {
])
}

def runClosure(label, Closure body) {
def runBuilderClosure(label, Closure body) {
podTemplate(name: label, label: label, instanceCap: 15, containers: [
containerTemplate(name: 'dockerd', image: 'docker:18.09.6-dind', privileged: true,
resourceRequestCpu: '5000m', resourceRequestMemory: '10Gi',
resourceLimitCpu: '16000m', resourceLimitMemory: '32Gi'),
containerTemplate(name: 'docker', image: 'hub.pingcap.net/jenkins/docker:build-essential-java',
alwaysPullImage: true, envVars: [
envVar(key: 'DOCKER_HOST', value: 'tcp://localhost:2375'),
Expand All @@ -94,8 +91,25 @@ def runClosure(label, Closure body) {
}
}

def runWithTiCSFull(label, curws, Closure body) {
runClosure(label) {
def runClosure(label, Closure body) {
podTemplate(name: label, label: label, instanceCap: 15, containers: [
containerTemplate(name: 'dockerd', image: 'docker:18.09.6-dind', privileged: true,
resourceRequestCpu: '5000m', resourceRequestMemory: '10Gi',
resourceLimitCpu: '16000m', resourceLimitMemory: '32Gi'),
containerTemplate(name: 'docker', image: 'hub.pingcap.net/jenkins/docker:build-essential-java',
alwaysPullImage: true, envVars: [
envVar(key: 'DOCKER_HOST', value: 'tcp://localhost:2375'),
], ttyEnabled: true, command: 'cat'),
]
) {
node(label) {
body()
}
}
}

def runCheckoutAndBuilderClosure(label, curws, Closure body) {
runBuilderClosure(label) {
dir("${curws}/tics") {
stage("Checkout") {
container("docker") {
Expand All @@ -119,7 +133,7 @@ def runWithTiCSFull(label, curws, Closure body) {
}
}

def runTest(label, testPath, tidbBranch) {
def runTest(label, name, testPath, tidbBranch) {
runClosure(label) {
stage("Unstash") {
unstash 'git-code-tics'
Expand All @@ -142,9 +156,9 @@ def runTest(label, testPath, tidbBranch) {
sh "pwd"
sh "TAG=${params.ghprbActualCommit} BRANCH=${tidbBranch} bash -xe ./run.sh"
} catch (e) {
sh "mv log ${label}-log"
archiveArtifacts(artifacts: "${label}-log/**/*.log", allowEmptyArchive: true)
sh "find log -name '*.log' | xargs tail -n 500"
sh "mv log ${name}-log"
archiveArtifacts(artifacts: "${name}-log/**/*.log", allowEmptyArchive: true)
sh "find ${name}-log -name '*.log' | xargs tail -n 500"
sh "docker ps -a"
throw e
}
Expand All @@ -155,9 +169,9 @@ def runTest(label, testPath, tidbBranch) {
}
}

def runUTCoverTICS(CURWS, NPROC) {
def runUnitTests(label, CURWS, NPROC) {
def NPROC_UT = NPROC * 2
runWithTiCSFull("ut-tics", CURWS) {
runCheckoutAndBuilderClosure(label, CURWS) {
dir("${CURWS}/tics") {
stage("Build") {
timeout(time: 70, unit: 'MINUTES') {
Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Common/MemoryTrackerSetter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <Common/MemoryTracker.h>

#include <boost/noncopyable.hpp>

namespace DB
{
class MemoryTrackerSetter : private boost::noncopyable
{
public:
MemoryTrackerSetter(bool enable_, MemoryTracker * memory_tracker)
: enable(enable_)
, old_memory_tracker(current_memory_tracker)
{
if (enable)
current_memory_tracker = memory_tracker;
}

~MemoryTrackerSetter()
{
if (enable)
current_memory_tracker = old_memory_tracker;
}

private:
bool enable;
MemoryTracker * old_memory_tracker;
};
} // namespace DB
43 changes: 9 additions & 34 deletions dbms/src/Common/ThreadFactory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Common/MemoryTracker.h>
#include <Common/MemoryTrackerSetter.h>
#include <Common/setThreadName.h>
#include <common/ThreadPool.h>

Expand All @@ -17,55 +17,30 @@ namespace DB
class ThreadFactory
{
public:
/// force_overwrite_thread_attribute is only used for ThreadPool's jobs.
/// For new threads it is treated as always true.
explicit ThreadFactory(bool force_overwrite_thread_attribute = false, std::string thread_name_ = "")
: force_overwrite(force_overwrite_thread_attribute)
, thread_name(thread_name_)
{}

ThreadFactory(const ThreadFactory &) = delete;
ThreadFactory & operator=(const ThreadFactory &) = delete;

ThreadFactory(ThreadFactory &&) = default;
ThreadFactory & operator=(ThreadFactory &&) = default;

template <typename F, typename... Args>
std::thread newThread(F && f, Args &&... args)
static std::thread newThread(String thread_name, F && f, Args &&... args)
{
auto memory_tracker = current_memory_tracker;
auto wrapped_func = [memory_tracker, thread_name = thread_name, f = std::move(f)](auto &&... args) {
setAttributes(memory_tracker, thread_name, true);
auto wrapped_func = [memory_tracker, thread_name = std::move(thread_name), f = std::move(f)](auto &&... args) {
MemoryTrackerSetter setter(true, memory_tracker);
if (!thread_name.empty())
setThreadName(thread_name.c_str());
return std::invoke(f, std::forward<Args>(args)...);
};
return std::thread(wrapped_func, std::forward<Args>(args)...);
}

template <typename F, typename... Args>
ThreadPool::Job newJob(F && f, Args &&... args)
static ThreadPool::Job newJob(F && f, Args &&... args)
{
auto memory_tracker = current_memory_tracker;
/// Use std::tuple to workaround the limit on the lambda's init-capture of C++17.
/// See https://stackoverflow.com/questions/47496358/c-lambdas-how-to-capture-variadic-parameter-pack-from-the-upper-scope
return [force_overwrite = force_overwrite, memory_tracker, thread_name = thread_name, f = std::move(f), args = std::make_tuple(std::move(args)...)] {
setAttributes(memory_tracker, thread_name, force_overwrite);
return [memory_tracker, f = std::move(f), args = std::make_tuple(std::move(args)...)] {
MemoryTrackerSetter setter(true, memory_tracker);
return std::apply(f, std::move(args));
};
}

private:
static void setAttributes(MemoryTracker * memory_tracker, const std::string & thread_name, bool force_overwrite)
{
if (force_overwrite || !current_memory_tracker)
{
current_memory_tracker = memory_tracker;
if (!thread_name.empty())
setThreadName(thread_name.c_str());
}
}

bool force_overwrite = false;
std::string thread_name;
};

} // namespace DB
16 changes: 11 additions & 5 deletions dbms/src/Common/tests/gtest_cpu_affinity_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,24 @@ TEST(CPUAffinityManager_test, CPUAffinityManager)

CPUAffinityConfig config;
config.query_cpu_percent = 60;
config.cpu_cores = 40;
config.cpu_cores = std::thread::hardware_concurrency();
cpu_affinity.init(config);

ASSERT_TRUE(cpu_affinity.enable());
ASSERT_EQ(cpu_affinity.getOtherCPUCores(), 16);
ASSERT_EQ(cpu_affinity.getQueryCPUCores(), 24);

std::vector<int> except_other_cpu_set{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
std::vector<int> except_other_cpu_set;
for (int i = 0; i < cpu_affinity.getOtherCPUCores(); i++)
{
except_other_cpu_set.push_back(i);
}
auto other_cpu_set = cpu_affinity.cpuSetToVec(cpu_affinity.other_cpu_set);
ASSERT_EQ(other_cpu_set, except_other_cpu_set);

std::vector<int> except_query_cpu_set{16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39};
std::vector<int> except_query_cpu_set;
for (int i = 0; i < cpu_affinity.getQueryCPUCores(); i++)
{
except_query_cpu_set.push_back(cpu_affinity.getOtherCPUCores() + i);
}
auto query_cpu_set = cpu_affinity.cpuSetToVec(cpu_affinity.query_cpu_set);
ASSERT_EQ(query_cpu_set, except_query_cpu_set);

Expand Down
19 changes: 8 additions & 11 deletions dbms/src/DataStreams/AsynchronousBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
#pragma once

#include <Poco/Event.h>

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadFactory.h>
#include <Common/setThreadName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Poco/Event.h>
#include <common/ThreadPool.h>
#include <Common/MemoryTracker.h>


namespace CurrentMetrics
{
extern const Metric QueryThread;
extern const Metric QueryThread;
}

namespace DB
{

/** Executes another BlockInputStream in a separate thread.
* This serves two purposes:
* 1. Allows you to make the different stages of the query execution pipeline work in parallel.
Expand Down Expand Up @@ -101,7 +99,7 @@ class AsynchronousBlockInputStream : public IProfilingBlockInputStream
calculate();
started = true;
}
else /// If the calculations are already in progress - wait for the result
else /// If the calculations are already in progress - wait for the result
pool.wait();

if (exception)
Expand All @@ -122,7 +120,7 @@ class AsynchronousBlockInputStream : public IProfilingBlockInputStream
void next()
{
ready.reset();
pool.schedule(ThreadFactory(false, "AsyncBlockInput").newJob([this] { calculate(); }));
pool.schedule(ThreadFactory::newJob([this] { calculate(); }));
}


Expand Down Expand Up @@ -150,5 +148,4 @@ class AsynchronousBlockInputStream : public IProfilingBlockInputStream
}
};

}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void CreatingSetsBlockInputStream::createAll()
{
if (isCancelledOrThrowIfKilled())
return;
workers.emplace_back(ThreadFactory(true, "CreatingSets").newThread([this, &subquery = elem.second] { createOne(subquery); }));
workers.emplace_back(ThreadFactory::newThread("CreatingSets", [this, &subquery = elem.second] { createOne(subquery); }));
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
}
Expand Down
32 changes: 16 additions & 16 deletions dbms/src/DataStreams/DedupSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
#include <DataStreams/DedupSortedBlockInputStream.h>

#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadFactory.h>
#include <Common/setThreadName.h>
#include <DataStreams/DedupSortedBlockInputStream.h>

// #define DEDUP_TRACER
#ifndef DEDUP_TRACER
#define TRACER(message)
#define TRACER(message)
#else
#define TRACER(message) LOG_TRACE(log, message)
#define TRACER(message) LOG_TRACE(log, message)
#endif

namespace CurrentMetrics
{
// TODO: increase it
extern const Metric QueryThread;
}
// TODO: increase it
extern const Metric QueryThread;
} // namespace CurrentMetrics

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int LOGICAL_ERROR;
}

namespace DB
{

DedupSortedBlockInputStream::DedupSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_)
: description(description_), queue_max(1), source_blocks(inputs_.size(), queue_max),
output_block(inputs_.size() * queue_max), readers(inputs_.size())
: description(description_)
, queue_max(1)
, source_blocks(inputs_.size(), queue_max)
, output_block(inputs_.size() * queue_max)
, readers(inputs_.size())
{
log = &Poco::Logger::get("DedupSorted");

Expand All @@ -37,7 +38,7 @@ DedupSortedBlockInputStream::DedupSortedBlockInputStream(BlockInputStreams & inp
readers.schedule(std::bind(&DedupSortedBlockInputStream::asynFetch, this, i));

LOG_DEBUG(log, "Start deduping in single thread, using priority-queue");
dedup_thread = std::make_unique<std::thread>(ThreadFactory().newThread([this] { asyncDedupByQueue(); }));
dedup_thread = std::make_unique<std::thread>(ThreadFactory::newThread("AsyncDedup", [this] { asyncDedupByQueue(); }));
}


Expand Down Expand Up @@ -234,8 +235,7 @@ void DedupSortedBlockInputStream::asyncDedupByQueue()
}
}

LOG_DEBUG(log, "P All Done. Bounds " << bounds.str() << " Queue " << queue.str() <<
"Streams finished " << finished_streams << "/" << cursors.size());
LOG_DEBUG(log, "P All Done. Bounds " << bounds.str() << " Queue " << queue.str() << "Streams finished " << finished_streams << "/" << cursors.size());
}


Expand Down Expand Up @@ -282,4 +282,4 @@ void DedupSortedBlockInputStream::pushBlockBounds(const DedupingBlockPtr & block
}


}
} // namespace DB
Loading

0 comments on commit 790f737

Please sign in to comment.