Skip to content

SYSTEM PRESHUTDOWN command for graceful shutdown swarm node #852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: antalya-25.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/sql-reference/statements/system.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]

Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)

## PRESHUTDOWN {#preshutdown}

<CloudNotSupportedBadge/>

Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).

## KILL {#kill}

Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)
Expand Down
2 changes: 1 addition & 1 deletion src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ enum class AccessType : uint8_t
\
M(TABLE_ENGINE, "TABLE ENGINE", TABLE_ENGINE, ALL) \
\
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN, PRESHUTDOWN", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
Expand Down
27 changes: 27 additions & 0 deletions src/Interpreters/ClusterDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
return;
}

if (context->isPreShutdownCalled())
{
LOG_DEBUG(log, "PreShutdown called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
return;
}

LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);

zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
}

void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
{
if (info.current_node_is_observer)
return;

String node_path = getShardsListPath(info.zk_root) / current_node_name;
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);

zk->remove(node_path);
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
}

void ClusterDiscovery::initialUpdate()
{
LOG_DEBUG(log, "Initializing");
Expand Down Expand Up @@ -505,6 +523,15 @@ void ClusterDiscovery::initialUpdate()
is_initialized = true;
}

void ClusterDiscovery::unregisterAll()
{
for (auto & [_, info] : clusters_info)
{
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
unregisterFromZk(zk, info);
}
}

void ClusterDiscovery::findDynamicClusters(
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
std::unordered_set<size_t> * unchanged_roots)
Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/ClusterDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ClusterDiscovery

~ClusterDiscovery();

void unregisterAll();

private:
struct NodeInfo
{
Expand Down Expand Up @@ -125,6 +127,7 @@ class ClusterDiscovery
void initialUpdate();

void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);

Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
Expand Down
25 changes: 23 additions & 2 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ struct ContextSharedPart : boost::noncopyable
std::map<String, UInt16> server_ports;

std::atomic<bool> shutdown_called = false;
std::atomic<bool> preshutdown_called = false;

Stopwatch uptime_watch TSA_GUARDED_BY(mutex);

Expand Down Expand Up @@ -734,6 +735,7 @@ struct ContextSharedPart : boost::noncopyable
*/
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
preshutdown_called = true;
bool is_shutdown_called = shutdown_called.exchange(true);
if (is_shutdown_called)
return;
Expand Down Expand Up @@ -913,6 +915,11 @@ struct ContextSharedPart : boost::noncopyable
total_memory_tracker.resetPageCache();
}

void preShutdown()
{
preshutdown_called = true;
}

bool hasTraceCollector() const
{
return trace_collector.has_value();
Expand Down Expand Up @@ -4481,7 +4488,6 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
}


std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
{
std::shared_ptr<Cluster> res = nullptr;
Expand All @@ -4500,6 +4506,13 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
return res;
}

void Context::unregisterInDynamicClusters()
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->cluster_discovery)
return;
shared->cluster_discovery->unregisterAll();
}

void Context::reloadClusterConfig() const
{
Expand Down Expand Up @@ -5350,12 +5363,20 @@ void Context::stopServers(const ServerType & server_type) const
shared->stop_servers_callback(server_type);
}


void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
shared->shutdown();
}

void Context::preShutdown()
{
shared->preshutdown_called = true;
}

bool Context::isPreShutdownCalled() const
{
return shared->preshutdown_called;
}

Context::ApplicationType Context::getApplicationType() const
{
Expand Down
5 changes: 5 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
size_t getClustersVersion() const;

void startClusterDiscovery();
void unregisterInDynamicClusters();

/// Sets custom cluster, but doesn't update configuration
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
Expand Down Expand Up @@ -1335,6 +1336,10 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>

void shutdown();

/// Stop some works to allow graceful shutdown later
void preShutdown();
bool isPreShutdownCalled() const;

bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }

Expand Down
8 changes: 8 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ BlockIO InterpreterSystemQuery::execute()
throw ErrnoException(ErrorCodes::CANNOT_KILL, "System call kill(0, SIGTERM) failed");
break;
}
case Type::PRESHUTDOWN:
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
getContext()->preShutdown();
getContext()->unregisterInDynamicClusters();
break;
}
case Type::KILL:
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
Expand Down Expand Up @@ -1468,6 +1475,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
switch (query.type)
{
case Type::SHUTDOWN:
case Type::PRESHUTDOWN:
case Type::KILL:
case Type::SUSPEND:
{
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
}
case Type::KILL:
case Type::SHUTDOWN:
case Type::PRESHUTDOWN:
case Type::DROP_DNS_CACHE:
case Type::DROP_CONNECTIONS_CACHE:
case Type::DROP_MMAP_CACHE:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
{
UNKNOWN,
SHUTDOWN,
PRESHUTDOWN,
KILL,
SUSPEND,
DROP_DNS_CACHE,
Expand Down
5 changes: 5 additions & 0 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
profile_info_callback = std::move(callback);
}

bool RemoteQueryExecutor::skipUnavailableShards() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards];
}

bool RemoteQueryExecutor::needToSkipUnavailableShard() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size());
Expand Down
2 changes: 2 additions & 0 deletions src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class RemoteQueryExecutor

IConnections & getConnections() { return *connections; }

bool skipUnavailableShards() const;

bool needToSkipUnavailableShard() const;

bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
Expand Down
33 changes: 25 additions & 8 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_FROM_SOCKET;
extern const int CANNOT_OPEN_FILE;
extern const int SOCKET_TIMEOUT;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}

RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
Expand Down Expand Up @@ -56,16 +57,32 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus

while (true)
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
try
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
if (read_context.packet.type == Protocol::Server::Data)
read_context.has_data_packets = true;
}
catch (const Exception & e)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
{
read_context.has_read_packet_part = PacketPart::None;
}
else
throw;
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;

suspend_callback();
}
}
Expand Down
1 change: 1 addition & 0 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
/// None -> Type -> Body -> None
/// None -> Body -> None
std::atomic<PacketPart> has_read_packet_part = PacketPart::None;
std::atomic_bool has_data_packets = false;
Packet packet;

RemoteQueryExecutor & executor;
Expand Down
22 changes: 19 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(

if (distributed_processing)
{
auto distributed_iterator = std::make_unique<ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]);
auto distributed_iterator = std::make_unique<ReadTaskIterator>(
local_context->getReadTaskCallback(),
local_context->getSettingsRef()[Setting::max_threads],
local_context);

if (is_archive)
return std::make_shared<ArchiveIterator>(object_storage, configuration, std::move(distributed_iterator), local_context, nullptr);
Expand Down Expand Up @@ -937,9 +940,16 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc
}

StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
const ReadTaskCallback & callback_, size_t max_threads_count)
: callback(callback_)
const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_)
: WithContext(context_)
, callback(callback_)
{
if (getContext()->isPreShutdownCalled())
Copy link
Collaborator

@ilejn ilejn Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true that operations prohibited in PreShutdown phase (e.g getting new tasks), are allowed in Shutdown phase?
If yes, is it correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand ClickHouse does not have specific shutdown phase. On SYSTEM SHUTDOWN just calls kill(0, SIGTERM). Without PRESHUTDOW this caused error on initiator as well as already taken but unfinished tasks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then what is the purpose of shutdown_called flag?

From the first glance I would expect that all checks that are true for preshutdown_called should be true for shutdown_called as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, understood, flag set in destructor.
Yes, make sense to set preshutdown there too.

{
LOG_DEBUG(getLogger("StorageObjectStorageSource"), "PRESHUTDOWN called, stop getting new tasks");
return;
}

ThreadPool pool(
CurrentMetrics::StorageObjectStorageThreads,
CurrentMetrics::StorageObjectStorageThreadsActive,
Expand Down Expand Up @@ -969,6 +979,12 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
{
if (getContext()->isPreShutdownCalled())
{
LOG_DEBUG(getLogger("StorageObjectStorageSource"), "PRESHUTDOWN called, stop getting new tasks");
return nullptr;
}

auto key = callback();
if (key.empty())
return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ class StorageObjectStorageSource : public SourceWithKeyCondition
void lazyInitialize();
};

class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator
class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, WithContext
{
public:
ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count);
ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_);

ObjectInfoPtr next(size_t) override;

Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part4.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part5.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part6.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part7.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part8.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/part9.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/partA.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
10,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/partB.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
11,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/partC.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/partD.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
13,"Bar"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/partE.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
14,"Foo"
1 change: 1 addition & 0 deletions tests/integration/test_s3_cluster/data/graceful/partF.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
15,"Bar"
Loading
Loading