Skip to content

Commit

Permalink
Parallel replicas. Part 2.5 (ClickHouse#47858)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitamikhaylov authored Mar 22, 2023
1 parent 6409ca9 commit 301e26b
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docker/test/stateful/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ function run_tests()
set +e

if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
clickhouse-test --client="clickhouse-client --use_hedged_requests=0 --allow_experimental_parallel_reading_from_replicas=1 \
clickhouse-test --client="clickhouse-client --use_hedged_requests=0 --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 \
--max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'" \
-j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --no-parallel-replicas --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class IColumn;
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Sources/RemoteSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ ISource::Status RemoteSource::prepare()
return status;
}

if (status == Status::PortFull)
if (status == Status::PortFull || status == Status::Ready)
{
/// Also push empty chunk to dependency to signal that we read data from remote source
/// or answered to the incoming request from parallel replica
Expand Down
6 changes: 4 additions & 2 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void StorageMergeTree::read(
size_t max_block_size,
size_t num_streams)
{
if (local_context->canUseParallelReplicasOnInitiator())
if (local_context->canUseParallelReplicasOnInitiator() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree)
{
auto table_id = getStorageID();

Expand Down Expand Up @@ -245,10 +245,12 @@ void StorageMergeTree::read(
}
else
{
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree;

if (auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage, nullptr, /*enable_parallel_reading*/local_context->canUseParallelReplicasOnFollower()))
processed_stage, nullptr, enable_parallel_reading))
query_plan = std::move(*plan);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/queries/0_stateless/02221_parallel_replicas_bug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh


${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 -nm < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null
${CLICKHOUSE_CLIENT} --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 -nm < "$CURDIR"/01099_parallel_distributed_insert_select.sql > /dev/null
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ as select * from numbers(1);
# Logical error: 'Coordinator for parallel reading from replicas is not initialized'.
opts=(
--allow_experimental_parallel_reading_from_replicas 1
--parallel_replicas_for_non_replicated_merge_tree 1
--max_parallel_replicas 3

--iterations 1
Expand Down
1 change: 1 addition & 0 deletions tests/queries/0_stateless/02404_memory_bound_merging.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ create table pr_t(a UInt64, b UInt64) engine=MergeTree order by a;
insert into pr_t select number % 1000, number % 1000 from numbers_mt(1e6);

set allow_experimental_parallel_reading_from_replicas = 1;
set parallel_replicas_for_non_replicated_merge_tree = 1;
set max_parallel_replicas = 3;
set use_hedged_requests = 0;
set cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
Expand Down
6 changes: 3 additions & 3 deletions tests/queries/1_stateful/00177_memory_bound_merging.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ test1() {
GROUP BY CounterID, URL, EventDate
ORDER BY URL, EventDate
LIMIT 5 OFFSET 10
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, use_hedged_requests = 0"
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, use_hedged_requests = 0"
check_replicas_read_in_order $query_id
}

Expand All @@ -51,7 +51,7 @@ test2() {
GROUP BY URL, EventDate
ORDER BY URL, EventDate
LIMIT 5 OFFSET 10
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, use_hedged_requests = 0, query_plan_aggregation_in_order = 1"
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, use_hedged_requests = 0, query_plan_aggregation_in_order = 1"
check_replicas_read_in_order $query_id
}

Expand All @@ -67,7 +67,7 @@ test3() {
FROM test.hits
WHERE CounterID = 1704509 AND UserID = 4322253409885123546
GROUP BY URL, EventDate
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, use_hedged_requests = 0
SETTINGS optimize_aggregation_in_order = 1, enable_memory_bound_merging_of_aggregation_results = 1, allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, max_parallel_replicas = 3, use_hedged_requests = 0
)
WHERE explain LIKE '%Aggr%Transform%' OR explain LIKE '%InOrder%'"
}
Expand Down

0 comments on commit 301e26b

Please sign in to comment.