Skip to content

Fix rendezvous hashing in complex queries #760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
if (extension)
return;

std::vector<std::string> ids_of_hosts;
for (const auto & shard : cluster->getShardsInfo())
{
if (shard.per_replica_pools.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
if (!shard.per_replica_pools[0])
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress());
}

extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts);
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
}

/// The code executes on initiator
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class IStorageCluster : public IStorage
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>> ids_of_hosts = std::nullopt) const = 0;
ClusterPtr cluster) const = 0;

QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;

Expand Down
17 changes: 15 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,26 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & local_context,
std::optional<std::vector<std::string>> ids_of_replicas) const
ClusterPtr cluster) const
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, {}, getVirtualsList(), nullptr, local_context->getFileProgressCallback());

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_replicas);
std::vector<std::string> ids_of_hosts;
for (const auto & shard : cluster->getShardsInfo())
{
if (shard.per_replica_pools.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
for (const auto & replica : shard.per_replica_pools)
{
if (!replica)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
ids_of_hosts.push_back(replica->getAddress());
}
}

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class StorageObjectStorageCluster : public IStorageCluster
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>> ids_of_replicas) const override;
ClusterPtr cluster) const override;

String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace DB

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::optional<std::vector<std::string>> ids_of_nodes_)
std::vector<std::string> ids_of_nodes_)
: iterator(std::move(iterator_))
, connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1)
, connection_to_files(ids_of_nodes_.size())
, ids_of_nodes(ids_of_nodes_)
, iterator_exhausted(false)
{
Expand Down Expand Up @@ -38,22 +38,18 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz

size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
{
if (!ids_of_nodes.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer.");

const auto & ids_of_nodes_value = ids_of_nodes.value();
size_t nodes_count = ids_of_nodes_value.size();
size_t nodes_count = ids_of_nodes.size();

/// Trivial case
if (nodes_count < 2)
return 0;

/// Rendezvous hashing
size_t best_id = 0;
UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path);
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
for (size_t id = 1; id < nodes_count; ++id)
{
UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path);
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
if (weight > best_weight)
{
best_weight = weight;
Expand All @@ -67,6 +63,14 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFil
{
std::lock_guard lock(mutex);

if (connection_to_files.size() <= number_of_current_replica)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} is out of range. Expected range: [0, {})",
number_of_current_replica,
connection_to_files.size()
);

auto & files = connection_to_files[number_of_current_replica];

while (!files.empty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor
public:
StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::optional<std::vector<std::string>> ids_of_nodes_);
std::vector<std::string> ids_of_nodes_);

std::optional<String> getNextTask(size_t number_of_current_replica);

Expand All @@ -33,7 +33,7 @@ class StorageObjectStorageStableTaskDistributor
std::vector<std::vector<String>> connection_to_files;
std::unordered_set<String> unprocessed_files;

std::optional<std::vector<std::string>> ids_of_nodes;
std::vector<std::string> ids_of_nodes;

std::mutex mutex;
bool iterator_exhausted = false;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
const auto cluster = getCluster();

/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context);
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster);

/// Here we take addresses from destination cluster and assume source table exists on these nodes
size_t replica_index = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageFileCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>>) const
ClusterPtr) const
{
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), context);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageFileCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class StorageFileCluster : public IStorageCluster
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>> ids_of_nodes) const override;
ClusterPtr) const override;

private:
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6007,7 +6007,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
ContextMutablePtr query_context = Context::createCopy(local_context);
query_context->increaseDistributedDepth();

auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context);
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster);

size_t replica_index = 0;
for (const auto & replicas : src_cluster->getShardsAddresses())
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageURLCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>>) const
ClusterPtr) const
{
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(
uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageURLCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class StorageURLCluster : public IStorageCluster
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>> ids_of_replicas) const override;
ClusterPtr) const override;

private:
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;
Expand Down
Loading