Skip to content

Commit

Permalink
Revert "Revise default background threads size" (#5176)
Browse files Browse the repository at this point in the history
close #5177
  • Loading branch information
Lloyd-Pottiger authored Jun 22, 2022
1 parent 649462a commit 45bc5a4
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 54 deletions.
1 change: 1 addition & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
/// too short a period can cause errors to disappear immediately after creation.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC)
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16

#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
Expand Down
24 changes: 6 additions & 18 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@
#include <fmt/core.h>

#include <boost/functional/hash/hash.hpp>
#include <map>
#include <pcg_random.hpp>
#include <unordered_map>
#include <set>


namespace ProfileEvents
{
Expand Down Expand Up @@ -1439,33 +1441,19 @@ void Context::dropCaches() const
}

BackgroundProcessingPool & Context::getBackgroundPool()
{
// Note: shared->background_pool should be initialized first.
auto lock = getLock();
return *shared->background_pool;
}

BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size)
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
shared->background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
return *shared->background_pool;
}

BackgroundProcessingPool & Context::getBlockableBackgroundPool()
{
// TODO: maybe a better name for the pool
// Note: shared->blockable_background_pool should be initialized first.
auto lock = getLock();
return *shared->blockable_background_pool;
}

BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size)
{
// TODO: choose a better thread pool size and maybe a better name for the pool
auto lock = getLock();
if (!shared->blockable_background_pool)
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
return *shared->blockable_background_pool;
}

Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,7 @@ class Context
bool useL0Opt() const;

BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBlockableBackgroundPool();
BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size);

void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config);

Expand Down Expand Up @@ -507,7 +505,7 @@ class DDLGuard
class SessionCleaner
{
public:
explicit SessionCleaner(Context & context_)
SessionCleaner(Context & context_)
: context{context_}
{}
~SessionCleaner();
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ struct Settings
M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \
M(SettingUInt64, background_pool_size, 0, "Number of threads performing background work for tables (for example, merging in merge tree). Only effective at server startup. " \
"0 means a quarter of the number of logical CPU cores of the machine.") \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server " \
"startup.") \
\
M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \
\
Expand Down Expand Up @@ -356,7 +356,7 @@ struct Settings
M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \
M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \
M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \
M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means the number of logical CPU cores. Only effective at server startup")\
M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\
M(SettingBool, enable_async_server, true, "Enable async rpc server.") \
M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \
M(SettingUInt64, async_cqs, 1, "grpc async cqs") \
Expand Down
33 changes: 14 additions & 19 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,10 @@
#include <Poco/Net/NetException.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/MetricsPrometheus.h>
#include <Server/MetricsTransmitter.h>
#include <Server/RaftConfigParser.h>
#include <Server/Server.h>
#include <Server/ServerInfo.h>
#include <Server/StatusFile.h>
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
Expand All @@ -86,6 +81,12 @@
#include <limits>
#include <memory>

#include "HTTPHandlerFactory.h"
#include "MetricsPrometheus.h"
#include "MetricsTransmitter.h"
#include "StatusFile.h"
#include "TCPHandlerFactory.h"

#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h>
Expand Down Expand Up @@ -1127,19 +1128,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getPathCapacity(),
global_context->getFileProvider());

/// if default value of background_pool_size is 0
/// set it to the a quarter of the number of logical CPU cores of machine.
Settings & settings = global_context->getSettingsRef();
if (settings.background_pool_size == 0)
{
global_context->setSetting("background_pool_size", std::to_string(server_info.cpu_info.logical_cores / 4));
}
LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);

/// Initialize the background & blockable background thread pool.
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);

global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page);
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));
Expand Down Expand Up @@ -1256,6 +1244,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
global_context->setDefaultProfiles(config());
Settings & settings = global_context->getSettingsRef();

/// Initialize the background thread pool.
/// It internally depends on settings.background_pool_size,
/// so must be called after settings has been load.
auto & bg_pool = global_context->getBackgroundPool();
auto & blockable_bg_pool = global_context->getBlockableBackgroundPool();

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);
Expand Down Expand Up @@ -1407,7 +1402,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = server_info.cpu_info.logical_cores;
size = std::thread::hardware_concurrency();
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_)
: size(size_)
, thread_ids_counter(size_)
{
if (size <= 0)
throw Exception("BackgroundProcessingPool size must be greater than 0", ErrorCodes::LOGICAL_ERROR);

LOG_FMT_INFO(&Poco::Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with {} threads", size);

threads.resize(size);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class BackgroundProcessingPool
using TaskHandle = std::shared_ptr<TaskInfo>;


explicit BackgroundProcessingPool(int size_);
BackgroundProcessingPool(int size_);

size_t getNumberOfThreads() const { return size; }

Expand All @@ -96,7 +96,7 @@ class BackgroundProcessingPool
/// 2. thread B also get the same task
/// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task
/// 4. thread B find the task is not occupied and execute the task again almost immediately
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

~BackgroundProcessingPool();
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/TestUtils/TiFlashTestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#include <Storages/Transaction/TMTContext.h>
#include <TestUtils/TiFlashTestEnv.h>

#include <thread>

namespace DB::tests
{
std::unique_ptr<Context> TiFlashTestEnv::global_context = nullptr;
Expand All @@ -41,10 +39,6 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR
KeyManagerPtr key_manager = std::make_shared<MockKeyManager>(false);
global_context->initializeFileProvider(key_manager, false);

// initialize background & blockable background thread pool
global_context->initializeBackgroundPool(std::thread::hardware_concurrency() / 4);
global_context->initializeBlockableBackgroundPool(std::thread::hardware_concurrency() / 4);

// Theses global variables should be initialized by the following order
// 1. capacity
// 2. path pool
Expand Down

0 comments on commit 45bc5a4

Please sign in to comment.