Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouse: flatten_nested no longer works for Array(Tuple) in 24.3 #232

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions tests/integration/coordinator/plugins/clickhouse/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RestorableSource,
run_astacus_command,
)
from tests.utils import get_clickhouse_version
from typing import Final
from unittest import mock

Expand Down Expand Up @@ -86,7 +87,11 @@ async def restorable_cluster_manager(
storage_path, zookeeper, clickhouse_cluster, ports, minio_bucket
) as astacus_cluster:
clients = [get_clickhouse_client(service) for service in clickhouse_cluster.services]
await setup_cluster_content(clients, clickhouse_cluster.use_named_collections)
await setup_cluster_content(
clients,
clickhouse_cluster.use_named_collections,
get_clickhouse_version(clickhouse_command),
)
await setup_cluster_users(clients)
run_astacus_command(astacus_cluster, "backup")
# We have destroyed everything except the backup storage dir
Expand Down Expand Up @@ -182,7 +187,9 @@ async def sync_replicated_table(clients: Sequence[ClickHouseClient], table_name:
await client.execute(f"SYSTEM SYNC REPLICA default.{escape_sql_identifier(table_name.encode())} STRICT".encode())


async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_named_collections: bool) -> None:
async def setup_cluster_content(
clients: Sequence[HttpClickHouseClient], use_named_collections: bool, clickhouse_version: tuple[int, ...]
) -> None:
for client in clients:
await client.execute(b"DROP DATABASE default SYNC")
await client.execute(
Expand Down Expand Up @@ -231,12 +238,13 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam
b"SETTINGS index_granularity=8192 "
b"SETTINGS flatten_nested=0"
)
await clients[0].execute(
b"CREATE TABLE default.array_tuple_flatten (thekey UInt32, thedata Array(Tuple(a UInt32, b UInt32))) "
b"ENGINE = ReplicatedMergeTree ORDER BY (thekey) "
b"SETTINGS index_granularity=8192 "
b"SETTINGS flatten_nested=1"
)
if clickhouse_version < (24, 3):
await clients[0].execute(
b"CREATE TABLE default.array_tuple_flatten (thekey UInt32, thedata Array(Tuple(a UInt32, b UInt32))) "
b"ENGINE = ReplicatedMergeTree ORDER BY (thekey) "
b"SETTINGS index_granularity=8192 "
b"SETTINGS flatten_nested=1"
)
# integrations - note most of these never actually attempt to connect to the remote server.
if await is_engine_available(clients[0], TableEngine.PostgreSQL):
await clients[0].execute(
Expand Down Expand Up @@ -306,7 +314,8 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam
await clients[0].execute(b"INSERT INTO default.nested_not_flatten VALUES (123, [(4, 5)])")
await clients[0].execute(b"INSERT INTO default.nested_flatten VALUES (123, [4], [5])")
await clients[0].execute(b"INSERT INTO default.array_tuple_not_flatten VALUES (123, [(4, 5)])")
await clients[0].execute(b"INSERT INTO default.array_tuple_flatten VALUES (123, [4], [5])")
if clickhouse_version < (24, 3):
await clients[0].execute(b"INSERT INTO default.array_tuple_flatten VALUES (123, [4], [5])")
# And some object storage data
await clients[0].execute(b"INSERT INTO default.in_object_storage VALUES (123, 'foo')")
await clients[1].execute(b"INSERT INTO default.in_object_storage VALUES (456, 'bar')")
Expand Down Expand Up @@ -376,16 +385,21 @@ async def test_restores_table_with_nullable_key(restored_cluster: Sequence[Click
assert response == []


async def test_restores_table_with_nested_fields(restored_cluster: Sequence[ClickHouseClient]) -> None:
async def test_restores_table_with_nested_fields(
restored_cluster: Sequence[ClickHouseClient], clickhouse_command: ClickHouseCommand
) -> None:
client = restored_cluster[0]
response = await client.execute(b"SELECT thekey, thedata FROM default.nested_not_flatten ORDER BY thekey")
assert response == [[123, [{"a": 4, "b": 5}]]]
response = await client.execute(b"SELECT thekey, thedata.a, thedata.b FROM default.nested_flatten ORDER BY thekey")
assert response == [[123, [4], [5]]]
response = await client.execute(b"SELECT thekey, thedata FROM default.array_tuple_not_flatten ORDER BY thekey")
assert response == [[123, [{"a": 4, "b": 5}]]]
response = await client.execute(b"SELECT thekey, thedata.a, thedata.b FROM default.array_tuple_flatten ORDER BY thekey")
assert response == [[123, [4], [5]]]
if get_clickhouse_version(clickhouse_command) < (24, 3):
response = await client.execute(
b"SELECT thekey, thedata.a, thedata.b FROM default.array_tuple_flatten ORDER BY thekey"
)
assert response == [[123, [4], [5]]]


async def test_restores_function_table(restored_cluster: Sequence[ClickHouseClient]) -> None:
Expand Down Expand Up @@ -512,7 +526,9 @@ async def test_cleanup_does_not_break_object_storage_disk_files(
storage_path, zookeeper, clickhouse_cluster, ports, minio_bucket
) as astacus_cluster:
clients = [get_clickhouse_client(service) for service in clickhouse_cluster.services]
await setup_cluster_content(clients, clickhouse_cluster.use_named_collections)
await setup_cluster_content(
clients, clickhouse_cluster.use_named_collections, get_clickhouse_version(clickhouse_command)
)
await setup_cluster_users(clients)
run_astacus_command(astacus_cluster, "backup")
run_astacus_command(astacus_cluster, "backup")
Expand Down
9 changes: 8 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
Copyright (c) 2020 Aiven Ltd
See LICENSE for details
"""

from astacus.common.rohmustorage import RohmuCompressionType, RohmuConfig
from collections.abc import Sequence
from functools import cache
from pathlib import Path
from typing import Final

Expand Down Expand Up @@ -85,11 +87,16 @@ def parse_clickhouse_version(command_output: bytes) -> tuple[int, ...]:
return version_tuple


def get_clickhouse_version(command: Sequence[str | Path]) -> tuple[int, ...]:
@cache
def _get_clickhouse_version(command: tuple[str | Path, ...]) -> tuple[int, ...]:
version_command_output = subprocess.check_output([*command, "--version"])
return parse_clickhouse_version(version_command_output)


def get_clickhouse_version(command: Sequence[str | Path]) -> tuple[int, ...]:
return _get_clickhouse_version(tuple(command))


def is_cassandra_driver_importable() -> bool:
return importlib.util.find_spec("cassandra") is not None

Expand Down
Loading