Skip to content

Commit 0ece70e

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #760 from Altinity/feature/fix_rendezvous_hashing
Fix rendezvous hashing in complex queries
1 parent 76c1123 commit 0ece70e

12 files changed

+39
-32
lines changed

src/Storages/IStorageCluster.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,17 +112,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
112112
if (extension)
113113
return;
114114

115-
std::vector<std::string> ids_of_hosts;
116-
for (const auto & shard : cluster->getShardsInfo())
117-
{
118-
if (shard.per_replica_pools.empty())
119-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
120-
if (!shard.per_replica_pools[0])
121-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
122-
ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress());
123-
}
124-
125-
extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts);
115+
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
126116
}
127117

128118
/// The code executes on initiator

src/Storages/IStorageCluster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class IStorageCluster : public IStorage
3636
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
3737
const ActionsDAG::Node * predicate,
3838
const ContextPtr & context,
39-
std::optional<std::vector<std::string>> ids_of_hosts = std::nullopt) const = 0;
39+
ClusterPtr cluster) const = 0;
4040

4141
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4242

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,26 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
147147
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
148148
const ActionsDAG::Node * predicate,
149149
const ContextPtr & local_context,
150-
std::optional<std::vector<std::string>> ids_of_replicas) const
150+
ClusterPtr cluster) const
151151
{
152152
auto iterator = StorageObjectStorageSource::createFileIterator(
153153
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
154154
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true);
155155

156-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_replicas);
156+
std::vector<std::string> ids_of_hosts;
157+
for (const auto & shard : cluster->getShardsInfo())
158+
{
159+
if (shard.per_replica_pools.empty())
160+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
161+
for (const auto & replica : shard.per_replica_pools)
162+
{
163+
if (!replica)
164+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
165+
ids_of_hosts.push_back(replica->getAddress());
166+
}
167+
}
168+
169+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
157170

158171
auto callback = std::make_shared<TaskIterator>(
159172
[task_distributor](size_t number_of_current_replica) mutable -> String {

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class StorageObjectStorageCluster : public IStorageCluster
2727
RemoteQueryExecutor::Extension getTaskIteratorExtension(
2828
const ActionsDAG::Node * predicate,
2929
const ContextPtr & context,
30-
std::optional<std::vector<std::string>> ids_of_replicas) const override;
30+
ClusterPtr cluster) const override;
3131

3232
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
3333

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ namespace DB
88

99
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1010
std::shared_ptr<IObjectIterator> iterator_,
11-
std::optional<std::vector<std::string>> ids_of_nodes_)
11+
std::vector<std::string> ids_of_nodes_)
1212
: iterator(std::move(iterator_))
13-
, connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1)
13+
, connection_to_files(ids_of_nodes_.size())
1414
, ids_of_nodes(ids_of_nodes_)
1515
, iterator_exhausted(false)
1616
{
@@ -38,22 +38,18 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
3838

3939
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
4040
{
41-
if (!ids_of_nodes.has_value())
42-
throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer.");
43-
44-
const auto & ids_of_nodes_value = ids_of_nodes.value();
45-
size_t nodes_count = ids_of_nodes_value.size();
41+
size_t nodes_count = ids_of_nodes.size();
4642

4743
/// Trivial case
4844
if (nodes_count < 2)
4945
return 0;
5046

5147
/// Rendezvous hashing
5248
size_t best_id = 0;
53-
UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path);
49+
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
5450
for (size_t id = 1; id < nodes_count; ++id)
5551
{
56-
UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path);
52+
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
5753
if (weight > best_weight)
5854
{
5955
best_weight = weight;
@@ -67,6 +63,14 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFil
6763
{
6864
std::lock_guard lock(mutex);
6965

66+
if (connection_to_files.size() <= number_of_current_replica)
67+
throw Exception(
68+
ErrorCodes::LOGICAL_ERROR,
69+
"Replica number {} is out of range. Expected range: [0, {})",
70+
number_of_current_replica,
71+
connection_to_files.size()
72+
);
73+
7074
auto & files = connection_to_files[number_of_current_replica];
7175

7276
while (!files.empty())

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor
1818
public:
1919
StorageObjectStorageStableTaskDistributor(
2020
std::shared_ptr<IObjectIterator> iterator_,
21-
std::optional<std::vector<std::string>> ids_of_nodes_);
21+
std::vector<std::string> ids_of_nodes_);
2222

2323
std::optional<String> getNextTask(size_t number_of_current_replica);
2424

@@ -33,7 +33,7 @@ class StorageObjectStorageStableTaskDistributor
3333
std::vector<std::vector<String>> connection_to_files;
3434
std::unordered_set<String> unprocessed_files;
3535

36-
std::optional<std::vector<std::string>> ids_of_nodes;
36+
std::vector<std::string> ids_of_nodes;
3737

3838
std::mutex mutex;
3939
bool iterator_exhausted = false;

src/Storages/StorageDistributed.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
11641164
const auto cluster = getCluster();
11651165

11661166
/// Select query is needed for pruining on virtual columns
1167-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context);
1167+
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster);
11681168

11691169
/// Here we take addresses from destination cluster and assume source table exists on these nodes
11701170
size_t replica_index = 0;

src/Storages/StorageFileCluster.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
8080
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
8181
const ActionsDAG::Node * predicate,
8282
const ContextPtr & context,
83-
std::optional<std::vector<std::string>>) const
83+
ClusterPtr) const
8484
{
8585
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), context);
8686
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });

src/Storages/StorageFileCluster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class StorageFileCluster : public IStorageCluster
3030
RemoteQueryExecutor::Extension getTaskIteratorExtension(
3131
const ActionsDAG::Node * predicate,
3232
const ContextPtr & context,
33-
std::optional<std::vector<std::string>> ids_of_nodes) const override;
33+
ClusterPtr) const override;
3434

3535
private:
3636
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6011,7 +6011,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
60116011
ContextMutablePtr query_context = Context::createCopy(local_context);
60126012
query_context->increaseDistributedDepth();
60136013

6014-
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context);
6014+
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster);
60156015

60166016
size_t replica_index = 0;
60176017
for (const auto & replicas : src_cluster->getShardsAddresses())

0 commit comments

Comments
 (0)