Skip to content

feat(tests): support custom product versions #760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ All notable changes to this project will be documented in this file.
- The defaults from the docker images itself will now apply, which will be different from 1000/0 going forward
- This is marked as breaking because tools and policies might exist, which require these fields to be set
- Deprecate Trino 470 ([#755]).
- test: support custom versions ([#760]).

### Fixed

Expand All @@ -52,6 +53,7 @@ All notable changes to this project will be documented in this file.
[#748]: https://github.com/stackabletech/trino-operator/pull/748
[#752]: https://github.com/stackabletech/trino-operator/pull/752
[#755]: https://github.com/stackabletech/trino-operator/pull/755
[#760]: https://github.com/stackabletech/trino-operator/pull/760

## [25.3.0] - 2025-03-21

Expand Down
236 changes: 191 additions & 45 deletions tests/templates/kuttl/smoke/check-s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
import trino
import argparse
import sys
import re

if not sys.warnoptions:
import warnings
warnings.simplefilter("ignore")


def get_connection(username, password, namespace):
host = 'trino-coordinator-default-0.trino-coordinator-default.' + namespace + '.svc.cluster.local'
host = (
"trino-coordinator-default-0.trino-coordinator-default."
+ namespace
+ ".svc.cluster.local"
)
# If you want to debug this locally use
# kubectl -n kuttl-test-XXX port-forward svc/trino-coordinator-default 8443
# host = '127.0.0.1'
Expand All @@ -18,7 +23,7 @@ def get_connection(username, password, namespace):
host=host,
port=8443,
user=username,
http_scheme='https',
http_scheme="https",
auth=trino.auth.BasicAuthentication(username, password),
session_properties={"query_max_execution_time": "60s"},
)
Expand All @@ -33,34 +38,47 @@ def run_query(connection, query):
return cursor.fetchall()


if __name__ == '__main__':
if __name__ == "__main__":
# Construct an argument parser
all_args = argparse.ArgumentParser()
# Add arguments to the parser
all_args.add_argument("-n", "--namespace", required=True, help="Namespace the test is running in")
all_args.add_argument(
"-n", "--namespace", required=True, help="Namespace the test is running in"
)

args = vars(all_args.parse_args())
namespace = args["namespace"]

print("Starting S3 tests...")
connection = get_connection("admin", "admin", namespace)

trino_version = run_query(connection, "select node_version from system.runtime.nodes where coordinator = true and state = 'active'")[0][0]
print(f"[INFO] Testing against Trino version \"{trino_version}\"")
trino_version = run_query(
connection,
"select node_version from system.runtime.nodes where coordinator = true and state = 'active'",
)[0][0]
print(f'[INFO] Testing against Trino version "{trino_version}"')

assert len(trino_version) >= 3
assert trino_version.isnumeric()
# Strip SDP release suffix from the version string
trino_product_version = re.split(r"-stackable", trino_version, maxsplit=1)[0]

assert len(trino_product_version) >= 3
assert trino_product_version.isnumeric()
assert trino_version == run_query(connection, "select version()")[0][0]

run_query(connection, "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')")
run_query(
connection,
"CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')",
)

run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data")
run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data_copy")
run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data_transformed")
run_query(connection, "DROP TABLE IF EXISTS hive.hdfs.taxi_data_copy")
run_query(connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg")

run_query(connection, """
run_query(
connection,
"""
CREATE TABLE IF NOT EXISTS hive.minio.taxi_data (
vendor_id VARCHAR,
tpep_pickup_datetime VARCHAR,
Expand All @@ -73,13 +91,24 @@ def run_query(connection, query):
format = 'csv',
skip_header_line_count = 1
)
""")
assert run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data")[0][0] == 5000
rows_written = run_query(connection, "CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data")[0][0]
""",
)
assert (
run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data")[0][0] == 5000
)
rows_written = run_query(
connection,
"CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data",
)[0][0]
assert rows_written == 5000 or rows_written == 0
assert run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_copy")[0][0] == 5000
assert (
run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_copy")[0][0]
== 5000
)

rows_written = run_query(connection, """
rows_written = run_query(
connection,
"""
CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_transformed AS
SELECT
CAST(vendor_id as BIGINT) as vendor_id,
Expand All @@ -89,61 +118,178 @@ def run_query(connection, query):
CAST(trip_distance as DOUBLE) as trip_distance,
CAST(ratecode_id as BIGINT) as ratecode_id
FROM hive.minio.taxi_data
""")[0][0]
""",
)[0][0]
assert rows_written == 5000 or rows_written == 0
assert run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed")[0][0] == 5000
assert (
run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed")[
0
][0]
== 5000
)

print("[INFO] Testing HDFS")

run_query(connection, "CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')")
rows_written = run_query(connection, "CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data")[0][0]
run_query(
connection,
"CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')",
)
rows_written = run_query(
connection,
"CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data",
)[0][0]
assert rows_written == 5000 or rows_written == 0
assert run_query(connection, "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy")[0][0] == 5000
assert (
run_query(connection, "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy")[0][0]
== 5000
)

print("[INFO] Testing Iceberg")
run_query(connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg") # Clean up table to don't fail an second run
assert run_query(connection, """
run_query(
connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg"
) # Clean up table to don't fail an second run
assert (
run_query(
connection,
"""
CREATE TABLE IF NOT EXISTS iceberg.minio.taxi_data_copy_iceberg
WITH (partitioning = ARRAY['vendor_id', 'passenger_count'], format = 'parquet')
AS SELECT * FROM hive.minio.taxi_data
""")[0][0] == 5000
""",
)[0][0]
== 5000
)
# Check current count
assert run_query(connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg")[0][0] == 5000
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"')[0][0] == 1
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"')[0][0] == 12
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"')[0][0] == 12
assert (
run_query(
connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg"
)[0][0]
== 5000
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"',
)[0][0]
== 1
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"',
)[0][0]
== 12
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"',
)[0][0]
== 12
)

assert run_query(connection, "INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data")[0][0] == 5000
assert (
run_query(
connection,
"INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data",
)[0][0]
== 5000
)

# Check current count
assert run_query(connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg")[0][0] == 10000
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"')[0][0] == 2
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"')[0][0] == 12
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"')[0][0] == 24
assert (
run_query(
connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg"
)[0][0]
== 10000
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"',
)[0][0]
== 2
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"',
)[0][0]
== 12
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"',
)[0][0]
== 24
)

if trino_version == '377':
if trino_version == "377":
# io.trino.spi.TrinoException: This connector [iceberg] does not support versioned tables
print("[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables")
print(
"[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables"
)
else:
# Check count for first snapshot
first_snapshot = run_query(connection, 'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1')[0][0]
assert run_query(connection, f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF {first_snapshot}")[0][0] == 5000
first_snapshot = run_query(
connection,
'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1',
)[0][0]
assert (
run_query(
connection,
f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF {first_snapshot}",
)[0][0]
== 5000
)

# Compact files
run_query(connection, "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize")
run_query(
connection, "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize"
)

# Check current count
assert run_query(connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg")[0][0] == 10000
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"')[0][0] == 3
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"')[0][0] == 12
assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"')[0][0] == 12 # Compaction yeah :)
assert (
run_query(
connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg"
)[0][0]
== 10000
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"',
)[0][0]
== 3
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"',
)[0][0]
== 12
)
assert (
run_query(
connection,
'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"',
)[0][0]
== 12
) # Compaction yeah :)

# Test could be improved by also testing update and deletes

# Test postgres connection
run_query(connection, 'SHOW SCHEMAS IN postgresgeneric')
run_query(connection, 'CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch')
run_query(connection, 'CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation')
assert run_query(connection, "SELECT COUNT(*) FROM postgresgeneric.tpch.nation")[0][0] == 25
run_query(connection, "SHOW SCHEMAS IN postgresgeneric")
run_query(connection, "CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch")
run_query(
connection,
"CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation",
)
assert (
run_query(connection, "SELECT COUNT(*) FROM postgresgeneric.tpch.nation")[0][0]
== 25
)

print("[SUCCESS] All tests in check-s3.py succeeded!")
8 changes: 6 additions & 2 deletions tests/templates/kuttl/smoke_aws/check-s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import trino
import argparse
import sys
import re

if not sys.warnoptions:
import warnings
Expand Down Expand Up @@ -57,8 +58,11 @@ def run_query(connection, query):
)[0][0]
print(f'[INFO] Testing against Trino version "{trino_version}"')

assert len(trino_version) >= 3
assert trino_version.isnumeric()
# Strip SDP release suffix from the version string
trino_product_version = re.split(r"-stackable", trino_version, maxsplit=1)[0]

assert len(trino_product_version) >= 3
assert trino_product_version.isnumeric()
assert trino_version == run_query(connection, "select version()")[0][0]

# WARNING (@NickLarsenNZ): Hard-coded bucket
Expand Down