From 80f35a0e38877cfc2caf72a0a64f2a3d950c7604 Mon Sep 17 00:00:00 2001 From: Richard Tia Date: Wed, 21 Dec 2022 14:10:19 -0800 Subject: [PATCH] feat: initial commit to add velox as a consumer fix: update java lib path for linux fix: use double instead of decimal in duckdb fix: update substrait to use doubles and varchar feat: velox consumer support for extension function tests fix: add updated schema for isthmus fix: update rounding functions fix: update boolean function tests fix: update schemas for consumers --- tests/conftest.py | 10 +- tests/consumers.py | 47 ++++- tests/context.py | 2 +- tests/data/schema.sql | 8 + .../test_boolean_functions.py | 2 - .../rounding_functions_expr.py | 4 +- .../sql/arithmetic_demical_functions_sql.py | 4 +- .../queries/sql/rounding_functions_sql.py | 4 +- tests/integration/queries/tpch_sql/q1.sql | 12 +- tests/integration/queries/tpch_sql/q5.sql | 2 +- .../tpch_substrait_plans/query_1_plan.json | 174 +++++++----------- .../tpch_substrait_plans/query_2_plan.json | 76 ++++---- .../tpch_substrait_plans/query_3_plan.json | 113 +++++------- .../tpch_substrait_plans/query_4_plan.json | 44 ++--- .../tpch_substrait_plans/query_5_plan.json | 127 +++++-------- tests/integration/test_velox_tpch.py | 131 +++++++++++++ tests/java_definitions.py | 6 +- tests/producers.py | 25 ++- tests/schema_updates.py | 159 ++++++++++++++++ 19 files changed, 594 insertions(+), 356 deletions(-) create mode 100644 tests/data/schema.sql create mode 100644 tests/integration/test_velox_tpch.py create mode 100644 tests/schema_updates.py diff --git a/tests/conftest.py b/tests/conftest.py index e873063d..5964e090 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,9 +2,9 @@ import duckdb import pytest - from filelock import FileLock -from tests.consumers import AceroConsumer, DuckDBConsumer + +from tests.consumers import AceroConsumer, DuckDBConsumer, VeloxConsumer from tests.producers import DuckDBProducer, IbisProducer, IsthmusProducer @@ -32,19 +32,19 @@ def pytest_addoption(parser): action="store", default=",".join([x.__name__ for x in CONSUMERS]), help=f"A comma separated list of consumers to run against.", - choices=[x.__name__ for x in CONSUMERS] + choices=[x.__name__ for x in CONSUMERS], ) parser.addoption( "--producer", action="store", default=",".join([x.__name__ for x in PRODUCERS]), help="A comma separated list of producers to run against.", - choices=[x.__name__ for x in PRODUCERS] + choices=[x.__name__ for x in PRODUCERS], ) PRODUCERS = [DuckDBProducer, IbisProducer, IsthmusProducer] -CONSUMERS = [AceroConsumer, DuckDBConsumer] +CONSUMERS = [AceroConsumer, DuckDBConsumer, VeloxConsumer] def _get_consumers(): diff --git a/tests/consumers.py b/tests/consumers.py index da99a265..7c284a8c 100644 --- a/tests/consumers.py +++ b/tests/consumers.py @@ -8,8 +8,10 @@ import pyarrow as pa import pyarrow.parquet as pq import pyarrow.substrait as substrait +import velox from tests.common import SubstraitUtils +from tests.schema_updates import PA_SCHEMA, TABLE_TO_RECREATE class DuckDBConsumer: @@ -68,6 +70,14 @@ def load_tables_from_parquet( create_table_sql = f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}');" self.db_connection.execute(create_table_sql) created_tables.add(table_name) + if table_name in TABLE_TO_RECREATE.keys(): + self.db_connection.query( + f"ALTER TABLE {table_name} RENAME TO {table_name}_orig" + ) + self.db_connection.query(f"{TABLE_TO_RECREATE[table_name]}") + self.db_connection.query( + f"insert into {table_name} select * from {table_name}_orig" + ) table_names.append(table_name) return table_names @@ -93,7 +103,9 @@ def setup(self, db_connection, file_names: Iterable[str]): ) if table_name not in self.created_tables: self.created_tables.add(table_name) - self.tables[table_name] = pq.read_table(file_path) + self.tables[table_name] = pq.read_table( + file_path, schema=PA_SCHEMA[table_name] + ) else: table = pa.table( { @@ -144,3 +156,36 @@ def run_substrait_query(self, substrait_query: bytes) -> pa.Table: result = reader.read_all() return result + + +class VeloxConsumer: + """ + Adapts the Velox Substrait consumer to the test framework. + """ + + def __init__(self): + self.created_tables = set() + self.tables = {} + self.table_provider = lambda names: self.tables[names[0]] + + def setup(self, db_connection, file_names: Iterable[str]): + pass + + def run_substrait_query(self, substrait_query: bytes) -> pa.Table: + """ + Run the substrait plan against Velox. + + Parameters: + substrait_query: + A json formatted byte representation of the substrait query plan. + + Returns: + A pyarrow table resulting from running the substrait query plan. + """ + velox_result = velox.from_json(substrait_query) + + record_batches = [] + for vec in velox_result: + record_batches.append(vec.to_arrow()) + + return pa.Table.from_batches(record_batches) diff --git a/tests/context.py b/tests/context.py index 319d1fde..8ee6d54d 100644 --- a/tests/context.py +++ b/tests/context.py @@ -7,7 +7,7 @@ REPO_DIR = Path(__file__).parent.parent from com.google.protobuf.util import JsonFormat as json_formatter -schema_file = Path.joinpath(REPO_DIR, "tests/data/tpch_parquet/schema.sql") +schema_file = Path.joinpath(REPO_DIR, "tests/data/schema.sql") def produce_isthmus_substrait(sql_string, schema_list): diff --git a/tests/data/schema.sql b/tests/data/schema.sql new file mode 100644 index 00000000..a784f543 --- /dev/null +++ b/tests/data/schema.sql @@ -0,0 +1,8 @@ +CREATE TABLE lineitem(l_orderkey INTEGER, l_partkey INTEGER, l_suppkey INTEGER, l_linenumber INTEGER, l_quantity INTEGER, l_extendedprice DOUBLE, l_discount DOUBLE, l_tax DOUBLE, l_returnflag VARCHAR, l_linestatus VARCHAR, l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipinstruct VARCHAR, l_shipmode VARCHAR, l_comment VARCHAR); +CREATE TABLE orders(o_orderkey INTEGER, o_custkey INTEGER, o_orderstatus VARCHAR, o_totalprice DOUBLE, o_orderdate DATE, o_orderpriority VARCHAR, o_clerk VARCHAR, o_shippriority INTEGER, o_comment VARCHAR); +CREATE TABLE partsupp(ps_partkey INTEGER, ps_suppkey INTEGER, ps_availqty INTEGER, ps_supplycost DOUBLE, ps_comment VARCHAR); +CREATE TABLE part(p_partkey INTEGER, p_name VARCHAR, p_mfgr VARCHAR, p_brand VARCHAR, p_type VARCHAR, p_size INTEGER, p_container VARCHAR, p_retailprice DOUBLE, p_comment VARCHAR); +CREATE TABLE customer(c_custkey INTEGER, c_name VARCHAR, c_address VARCHAR, c_nationkey INTEGER, c_phone VARCHAR, c_acctbal DOUBLE, c_mktsegment VARCHAR, c_comment VARCHAR); +CREATE TABLE supplier(s_suppkey INTEGER, s_name VARCHAR, s_address VARCHAR, s_nationkey INTEGER, s_phone VARCHAR, s_acctbal DOUBLE, s_comment VARCHAR); +CREATE TABLE nation(n_nationkey INTEGER, n_name VARCHAR, n_regionkey INTEGER, n_comment VARCHAR); +CREATE TABLE region(r_regionkey INTEGER, r_name VARCHAR, r_comment VARCHAR); diff --git a/tests/functional/extension_functions/test_boolean_functions.py b/tests/functional/extension_functions/test_boolean_functions.py index 61a61c63..4e40e697 100644 --- a/tests/functional/extension_functions/test_boolean_functions.py +++ b/tests/functional/extension_functions/test_boolean_functions.py @@ -45,7 +45,6 @@ def test_boolean_functions( ibis_expr: Callable[[Table], Table], producer, consumer, - partsupp ) -> None: substrait_function_test( self.db_connection, @@ -55,6 +54,5 @@ def test_boolean_functions( ibis_expr, producer, consumer, - partsupp, self.table_t, ) diff --git a/tests/functional/queries/ibis_expressions/rounding_functions_expr.py b/tests/functional/queries/ibis_expressions/rounding_functions_expr.py index ee2332b0..13e017c4 100644 --- a/tests/functional/queries/ibis_expressions/rounding_functions_expr.py +++ b/tests/functional/queries/ibis_expressions/rounding_functions_expr.py @@ -1,11 +1,11 @@ def ceil_expr(partsupp): new_col = partsupp.ps_supplycost.ceil().name("CEIL_SUPPLYCOST") - return partsupp[partsupp.ps_supplycost, new_col] + return partsupp[new_col] def floor_expr(partsupp): new_col = partsupp.ps_supplycost.floor().name("FLOOR_SUPPLYCOST") - return partsupp[partsupp.ps_supplycost, new_col] + return partsupp[new_col] IBIS_SCALAR = { diff --git a/tests/functional/queries/sql/arithmetic_demical_functions_sql.py b/tests/functional/queries/sql/arithmetic_demical_functions_sql.py index f1e4f99a..f4cdb9f2 100644 --- a/tests/functional/queries/sql/arithmetic_demical_functions_sql.py +++ b/tests/functional/queries/sql/arithmetic_demical_functions_sql.py @@ -41,14 +41,14 @@ SQL_AGGREGATE = { "sum": ( """ - SELECT sum(L_EXTENDEDPRICE) AS SUM_EXTENDEDPRICE + SELECT CAST(sum(L_EXTENDEDPRICE) AS DECIMAL(15,2)) AS SUM_EXTENDEDPRICE FROM '{}'; """, [DuckDBProducer], ), "avg": ( """ - SELECT avg(L_EXTENDEDPRICE) AS AVG_EXTENDEDPRICE + SELECT CAST(avg(L_EXTENDEDPRICE) AS DECIMAL(15,2)) AVG_EXTENDEDPRICE FROM '{}'; """, [DuckDBProducer], diff --git a/tests/functional/queries/sql/rounding_functions_sql.py b/tests/functional/queries/sql/rounding_functions_sql.py index 42b73c5e..304f46ac 100644 --- a/tests/functional/queries/sql/rounding_functions_sql.py +++ b/tests/functional/queries/sql/rounding_functions_sql.py @@ -3,14 +3,14 @@ SQL_SCALAR = { "ceil": ( """ - SELECT PS_SUPPLYCOST, ceil(PS_SUPPLYCOST) AS CEIL_SUPPLYCOST + SELECT ceil(PS_SUPPLYCOST) AS CEIL_SUPPLYCOST FROM '{}'; """, [DuckDBProducer], ), "floor": ( """ - SELECT PS_SUPPLYCOST, floor(PS_SUPPLYCOST) AS FLOOR_SUPPLYCOST + SELECT floor(PS_SUPPLYCOST) AS FLOOR_SUPPLYCOST FROM '{}'; """, [DuckDBProducer], diff --git a/tests/integration/queries/tpch_sql/q1.sql b/tests/integration/queries/tpch_sql/q1.sql index 72c85fd7..7e746d28 100644 --- a/tests/integration/queries/tpch_sql/q1.sql +++ b/tests/integration/queries/tpch_sql/q1.sql @@ -2,12 +2,12 @@ SELECT l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, - sum(l_extendedprice) AS sum_base_price, - sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, - avg(l_quantity) AS avg_qty, - avg(l_extendedprice) AS avg_price, - avg(l_discount) AS avg_disc, + round(sum(l_extendedprice), 2) AS sum_base_price, + round(sum(l_extendedprice * (1 - l_discount)), 2) AS sum_disc_price, + round(sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)), 2) AS sum_charge, + round(avg(l_quantity), 2) AS avg_qty, + round(avg(l_extendedprice), 2) AS avg_price, + round(avg(l_discount), 2) AS avg_disc, count(*) AS count_order FROM '{}' diff --git a/tests/integration/queries/tpch_sql/q5.sql b/tests/integration/queries/tpch_sql/q5.sql index 22bb50c7..203e16fb 100644 --- a/tests/integration/queries/tpch_sql/q5.sql +++ b/tests/integration/queries/tpch_sql/q5.sql @@ -1,6 +1,6 @@ SELECT n_name, - sum(l_extendedprice * (1 - l_discount)) AS revenue + round(sum(l_extendedprice * (1 - l_discount)), 2) AS revenue FROM '{}', '{}', '{}', '{}', '{}', '{}' WHERE diff --git a/tests/integration/queries/tpch_substrait_plans/query_1_plan.json b/tests/integration/queries/tpch_substrait_plans/query_1_plan.json index 39d5e25c..53b72883 100644 --- a/tests/integration/queries/tpch_substrait_plans/query_1_plan.json +++ b/tests/integration/queries/tpch_substrait_plans/query_1_plan.json @@ -4,7 +4,7 @@ "uri": "/functions_aggregate_generic.yaml" }, { "extensionUriAnchor": 2, - "uri": "/functions_arithmetic_decimal.yaml" + "uri": "/functions_arithmetic.yaml" }, { "extensionUriAnchor": 1, "uri": "/functions_datetime.yaml" @@ -25,31 +25,31 @@ "extensionFunction": { "extensionUriReference": 2, "functionAnchor": 2, - "name": "multiply:opt_decimal_decimal" + "name": "multiply:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 2, "functionAnchor": 3, - "name": "subtract:opt_decimal_decimal" + "name": "subtract:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 2, "functionAnchor": 4, - "name": "add:opt_decimal_decimal" + "name": "add:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 2, "functionAnchor": 5, - "name": "sum:opt_decimal" + "name": "sum:opt_fp64" } }, { "extensionFunction": { "extensionUriReference": 2, "functionAnchor": 6, - "name": "avg:opt_decimal" + "name": "avg:opt_fp64" } }, { "extensionFunction": { @@ -75,7 +75,9 @@ "input": { "project": { "common": { - + "emit": { + "outputMapping": [16, 17, 18, 19, 20, 21, 22] + } }, "input": { "filter": { @@ -95,95 +97,87 @@ "types": [{ "i64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "i64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "i64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "i32": { + "i64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "date": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "date": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "date": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { "length": 44, "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }], "typeVariationReference": 0, @@ -207,7 +201,7 @@ "outputType": { "bool": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ @@ -306,11 +300,9 @@ "functionReference": 2, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ @@ -331,22 +323,18 @@ "functionReference": 3, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ "value": { "cast": { "type": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "input": { @@ -381,11 +369,9 @@ "functionReference": 2, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ @@ -394,11 +380,9 @@ "functionReference": 2, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ @@ -419,22 +403,18 @@ "functionReference": 3, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ "value": { "cast": { "type": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "input": { @@ -471,22 +451,18 @@ "functionReference": 4, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ "value": { "cast": { "type": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "input": { @@ -559,11 +535,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -588,11 +562,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -617,11 +589,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -646,11 +616,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -675,11 +643,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -704,11 +670,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -733,11 +697,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -806,4 +768,4 @@ } }], "expectedTypeUrls": [] -} +} \ No newline at end of file diff --git a/tests/integration/queries/tpch_substrait_plans/query_2_plan.json b/tests/integration/queries/tpch_substrait_plans/query_2_plan.json index 3d4b9855..58428870 100644 --- a/tests/integration/queries/tpch_substrait_plans/query_2_plan.json +++ b/tests/integration/queries/tpch_substrait_plans/query_2_plan.json @@ -1,13 +1,13 @@ { "extensionUris": [{ + "extensionUriAnchor": 4, + "uri": "/functions_arithmetic.yaml" + }, { "extensionUriAnchor": 1, "uri": "/functions_boolean.yaml" }, { "extensionUriAnchor": 3, "uri": "/functions_string.yaml" - }, { - "extensionUriAnchor": 4, - "uri": "/functions_arithmetic_decimal.yaml" }, { "extensionUriAnchor": 2, "uri": "/functions_comparison.yaml" @@ -34,7 +34,7 @@ "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 3, - "name": "min:decimal" + "name": "min:fp64" } }], "relations": [{ @@ -109,13 +109,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -132,17 +132,15 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { @@ -180,7 +178,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -197,17 +195,15 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { @@ -265,11 +261,9 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { @@ -317,7 +311,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -373,7 +367,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -564,7 +558,7 @@ }, "input": { "literal": { - "fixedChar": "%BRASS", + "varchar": "%BRASS", "nullable": false, "typeVariationReference": 0 } @@ -678,7 +672,7 @@ "value": { "cast": { "type": { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_REQUIRED" @@ -686,7 +680,7 @@ }, "input": { "literal": { - "fixedChar": "EUROPE", + "varchar": "EUROPE", "nullable": false, "typeVariationReference": 0 } @@ -786,11 +780,9 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { @@ -828,7 +820,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -845,17 +837,15 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { @@ -903,7 +893,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -959,7 +949,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -1185,7 +1175,7 @@ "value": { "cast": { "type": { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_REQUIRED" @@ -1193,7 +1183,7 @@ }, "input": { "literal": { - "fixedChar": "EUROPE", + "varchar": "EUROPE", "nullable": false, "typeVariationReference": 0 } @@ -1232,9 +1222,7 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } @@ -1414,4 +1402,4 @@ } }], "expectedTypeUrls": [] -} +} \ No newline at end of file diff --git a/tests/integration/queries/tpch_substrait_plans/query_3_plan.json b/tests/integration/queries/tpch_substrait_plans/query_3_plan.json index 4ca074d2..f2c967fe 100644 --- a/tests/integration/queries/tpch_substrait_plans/query_3_plan.json +++ b/tests/integration/queries/tpch_substrait_plans/query_3_plan.json @@ -1,10 +1,10 @@ { "extensionUris": [{ + "extensionUriAnchor": 4, + "uri": "/functions_arithmetic.yaml" + }, { "extensionUriAnchor": 1, "uri": "/functions_boolean.yaml" - }, { - "extensionUriAnchor": 4, - "uri": "/functions_arithmetic_decimal.yaml" }, { "extensionUriAnchor": 3, "uri": "/functions_datetime.yaml" @@ -40,19 +40,19 @@ "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 4, - "name": "multiply:opt_decimal_decimal" + "name": "multiply:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 5, - "name": "subtract:opt_decimal_decimal" + "name": "subtract:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 6, - "name": "sum:opt_decimal" + "name": "sum:opt_fp64" } }], "relations": [{ @@ -139,20 +139,18 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -198,17 +196,15 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "date": { @@ -216,13 +212,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -293,41 +289,33 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -348,13 +336,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -425,22 +413,13 @@ } }, { "value": { - "cast": { - "type": { - "fixedChar": { - "length": 10, - "typeVariationReference": 0, - "nullability": "NULLABILITY_REQUIRED" - } - }, - "input": { - "literal": { - "fixedChar": "HOUSEHOLD", - "nullable": false, - "typeVariationReference": 0 - } + "literal": { + "varChar": { + "value": "HOUSEHOLD", + "length": 10 }, - "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + "nullable": false, + "typeVariationReference": 0 } } }] @@ -630,11 +609,9 @@ "functionReference": 4, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ @@ -655,22 +632,18 @@ "functionReference": 5, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ "value": { "cast": { "type": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "input": { @@ -743,11 +716,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -848,4 +819,4 @@ } }], "expectedTypeUrls": [] -} +} \ No newline at end of file diff --git a/tests/integration/queries/tpch_substrait_plans/query_4_plan.json b/tests/integration/queries/tpch_substrait_plans/query_4_plan.json index 6e946cef..3fe9dd4e 100644 --- a/tests/integration/queries/tpch_substrait_plans/query_4_plan.json +++ b/tests/integration/queries/tpch_substrait_plans/query_4_plan.json @@ -90,17 +90,15 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "date": { @@ -108,13 +106,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -286,41 +284,33 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -341,13 +331,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" diff --git a/tests/integration/queries/tpch_substrait_plans/query_5_plan.json b/tests/integration/queries/tpch_substrait_plans/query_5_plan.json index b8e4bb36..921944cf 100644 --- a/tests/integration/queries/tpch_substrait_plans/query_5_plan.json +++ b/tests/integration/queries/tpch_substrait_plans/query_5_plan.json @@ -1,10 +1,10 @@ { "extensionUris": [{ + "extensionUriAnchor": 4, + "uri": "/functions_arithmetic.yaml" + }, { "extensionUriAnchor": 1, "uri": "/functions_boolean.yaml" - }, { - "extensionUriAnchor": 4, - "uri": "/functions_arithmetic_decimal.yaml" }, { "extensionUriAnchor": 3, "uri": "/functions_datetime.yaml" @@ -40,19 +40,19 @@ "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 4, - "name": "multiply:opt_decimal_decimal" + "name": "multiply:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 5, - "name": "subtract:opt_decimal_decimal" + "name": "subtract:opt_fp64_fp64" } }, { "extensionFunction": { "extensionUriReference": 4, "functionAnchor": 6, - "name": "sum:opt_decimal" + "name": "sum:opt_fp64" } }], "relations": [{ @@ -144,20 +144,18 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -203,17 +201,15 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "date": { @@ -221,13 +217,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -298,41 +294,33 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 1, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -353,13 +341,13 @@ "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "fixedChar": { + "varchar": { "length": 10, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -410,7 +398,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -427,17 +415,15 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 15, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" } }, { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, { "varchar": { @@ -485,7 +471,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -536,7 +522,7 @@ "nullability": "NULLABILITY_REQUIRED" } }, { - "fixedChar": { + "varchar": { "length": 25, "typeVariationReference": 0, "nullability": "NULLABILITY_NULLABLE" @@ -830,22 +816,13 @@ } }, { "value": { - "cast": { - "type": { - "fixedChar": { - "length": 25, - "typeVariationReference": 0, - "nullability": "NULLABILITY_REQUIRED" - } - }, - "input": { - "literal": { - "fixedChar": "ASIA", - "nullable": false, - "typeVariationReference": 0 - } + "literal": { + "varChar": { + "value": "ASIA", + "length": 25 }, - "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + "nullable": false, + "typeVariationReference": 0 } } }] @@ -961,11 +938,9 @@ "functionReference": 4, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ @@ -986,22 +961,18 @@ "functionReference": 5, "args": [], "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "arguments": [{ "value": { "cast": { "type": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "input": { @@ -1054,11 +1025,9 @@ "sorts": [], "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", "outputType": { - "decimal": { - "scale": 0, - "precision": 19, + "fp64": { "typeVariationReference": 0, - "nullability": "NULLABILITY_NULLABLE" + "nullability": "NULLABILITY_REQUIRED" } }, "invocation": "AGGREGATION_INVOCATION_ALL", @@ -1099,4 +1068,4 @@ } }], "expectedTypeUrls": [] -} +} \ No newline at end of file diff --git a/tests/integration/test_velox_tpch.py b/tests/integration/test_velox_tpch.py new file mode 100644 index 00000000..c3d94f99 --- /dev/null +++ b/tests/integration/test_velox_tpch.py @@ -0,0 +1,131 @@ +from typing import Iterable + +import duckdb +import pyarrow as pa +import pytest +from pyarrow import compute + +from ..common import SubstraitUtils +from ..consumers import AceroConsumer, DuckDBConsumer, VeloxConsumer +from ..parametrization import custom_parametrization +from ..verification import verify_equals +from .queries.tpch_test_cases import TPCH_QUERY_TESTS + + +@pytest.mark.usefixtures("prepare_tpch_parquet_data") +class TestVeloxConsumer: + """ + Test Class for testing Substrait using Velox as a consumer. + """ + + @staticmethod + @pytest.fixture(scope="class", autouse=True) + def setup_teardown_class(request): + cls = request.cls + cls.db_connection = duckdb.connect() + cls.db_connection.execute("INSTALL substrait") + cls.db_connection.execute("LOAD substrait") + cls.velox_consumer = VeloxConsumer() + cls.utils = SubstraitUtils() + cls.created_tables = set() + + yield + + cls.db_connection.close() + + @custom_parametrization(TPCH_QUERY_TESTS) + def test_isthmus_substrait_plan( + self, + test_name: str, + file_names: list, + sql_query: str, + substrait_query: str, + sort_results: bool = False, + ) -> None: + """ + 1. Format the substrait_query by replacing the 'local_files' 'uri_file' + path with the full path to the parquet data. + 2. Format the SQL query to work with DuckDB by setting the 'Table' + Parameters to be the relative files paths for parquet data. + 3. Run the substrait query plan. + 4. Execute the SQL on DuckDB. + 5. Compare substrait query plan results against the results of + running the SQL on DuckDB. + + Parameters: + test_name: + Name of test. + file_names: + List of parquet files. + sql_query: + SQL query. + substrait_query: + Substrait query. + sort_results: + Whether to sort the results before comparison. + """ + # Format the substrait query to include the parquet file paths. + # Calculate the result of running the substrait query plan. + substrait_query = self.utils.format_substrait_query(substrait_query, file_names) + subtrait_query_result_tb = self.velox_consumer.run_substrait_query( + substrait_query + ) + print(subtrait_query_result_tb) + + # Reformat the sql query to be used by duck db by inserting all the + # parquet filepaths where the table names should be. + # Calculate results to verify against by running the SQL query on DuckDB + sql_query = self.utils.format_sql_query(sql_query, file_names) + duckdb_query_result_tb = self.db_connection.query(f"{sql_query}").arrow() + + col_names = [x.lower() for x in subtrait_query_result_tb.column_names] + exp_col_names = [x.lower() for x in duckdb_query_result_tb.column_names] + + # Sort results by specified column names + if sort_results: + subtrait_sort_col = subtrait_query_result_tb.column_names[0] + subtrait_query_result_tb = arrow_sort_tb_values( + subtrait_query_result_tb, sortby=[subtrait_sort_col] + ) + duckdb_sort_col = duckdb_query_result_tb.column_names[0] + duckdb_query_result_tb = arrow_sort_tb_values( + duckdb_query_result_tb, sortby=[duckdb_sort_col] + ) + + # Verify results between substrait plan query and sql running against + # duckdb are equal. + verify_equals( + col_names, + exp_col_names, + message=f"Actual column names: \n{col_names} \n" + f"are not equal to the expected" + f"column names: \n{exp_col_names}", + ) + verify_equals( + subtrait_query_result_tb, + duckdb_query_result_tb, + message=f"Result table: \n{subtrait_query_result_tb} \n" + f"is not equal to the expected " + f"table: \n{duckdb_query_result_tb}", + ) + + +def arrow_sort_tb_values(table: pa.Table, sortby: Iterable[str]) -> pa.Table: + """ + Sort the pyarrow table by the given list of columns. + + Parameters: + table: + Original pyarrow Table. + sortby: + Columns to sort the results by. + + Returns: + Pyarrow Table sorted by given columns. + + """ + table_sorted_indexes = pa.compute.bottom_k_unstable( + table, sort_keys=sortby, k=len(table) + ) + table_sorted = table.take(table_sorted_indexes) + return table_sorted diff --git a/tests/java_definitions.py b/tests/java_definitions.py index 99a6c88d..88b04646 100644 --- a/tests/java_definitions.py +++ b/tests/java_definitions.py @@ -1,5 +1,6 @@ import os from pathlib import Path +from sys import platform import jpype @@ -14,7 +15,10 @@ jvm_path = java_home_path if not os.path.isfile(jvm_path): - jvm_path = java_home_path + "/lib/libjli.dylib" + if platform == "darwin": + jvm_path = java_home_path + "/lib/libjli.dylib" + elif platform == "linux": + jvm_path = java_home_path + "/lib/server/libjvm.so" jpype.startJVM("--enable-preview", convertStrings=True, jvmpath=jvm_path) jpype.addClassPath(isthmus_jars) diff --git a/tests/producers.py b/tests/producers.py index a9272abf..00badae2 100644 --- a/tests/producers.py +++ b/tests/producers.py @@ -9,6 +9,7 @@ from tests.common import SubstraitUtils from tests.context import get_schema, produce_isthmus_substrait +from tests.schema_updates import TABLE_TO_RECREATE class DuckDBProducer: @@ -46,7 +47,9 @@ def produce_substrait( def format_sql(self, created_tables, sql_query, file_names): if len(file_names) > 0: - table_names = load_tables_from_parquet(self.db_connection, created_tables, file_names) + table_names = load_tables_from_parquet( + self.db_connection, created_tables, file_names + ) sql_query = sql_query.format(*table_names) return sql_query @@ -90,7 +93,9 @@ def produce_substrait( def format_sql(self, created_tables, sql_query, file_names): if len(file_names) > 0: - table_names = load_tables_from_parquet(self.db_connection, created_tables, file_names) + table_names = load_tables_from_parquet( + self.db_connection, created_tables, file_names + ) sql_query = sql_query.format(*table_names) return sql_query @@ -110,9 +115,7 @@ def __init__(self, db_connection=None): def set_db_connection(self, db_connection): self.db_connection = db_connection - def produce_substrait( - self, sql_query: str, consumer, ibis_expr: str = None - ) -> str: + def produce_substrait(self, sql_query: str, consumer, ibis_expr: str = None) -> str: """ Produce the Isthmus substrait plan using the given SQL query. @@ -134,7 +137,9 @@ def format_sql(self, created_tables, sql_query, file_names): sql_query = sql_query.replace("'t'", "t") if len(file_names) > 0: self.file_names = file_names - table_names = load_tables_from_parquet(self.db_connection, created_tables, file_names) + table_names = load_tables_from_parquet( + self.db_connection, created_tables, file_names + ) sql_query = sql_query.format(*table_names) return sql_query @@ -166,6 +171,14 @@ def load_tables_from_parquet( create_table_sql = f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}');" db_connection.execute(create_table_sql) created_tables.add(table_name) + if table_name in TABLE_TO_RECREATE.keys(): + db_connection.query( + f"ALTER TABLE {table_name} RENAME TO {table_name}_orig" + ) + db_connection.query(f"{TABLE_TO_RECREATE[table_name]}") + db_connection.query( + f"insert into {table_name} select * from {table_name}_orig" + ) table_names.append(table_name) return table_names diff --git a/tests/schema_updates.py b/tests/schema_updates.py new file mode 100644 index 00000000..f1f0e204 --- /dev/null +++ b/tests/schema_updates.py @@ -0,0 +1,159 @@ +from pathlib import Path +import pyarrow as pa + + +REPO_DIR = Path(__file__).parent.parent +schema_file = Path.joinpath(REPO_DIR, "tests/data/schema.sql") + +ORDERS_TABLE = """ +CREATE TABLE orders( + o_orderkey INTEGER NOT NULL, + o_custkey INTEGER NOT NULL, + o_orderstatus VARCHAR NOT NULL, + o_totalprice DOUBLE NOT NULL, + o_orderdate DATE NOT NULL, + o_orderpriority VARCHAR NOT NULL, + o_clerk VARCHAR NOT NULL, + o_shippriority INTEGER NOT NULL, + o_comment VARCHAR NOT NULL); +""" +ORDERS_PA_SCHEMA = pa.schema([ + pa.field('o_orderkey', pa.int32()), + pa.field('o_custkey', pa.int32()), + pa.field('o_orderstatus', pa.string()), + pa.field('o_totalprice', pa.float64()), + pa.field('o_orderdate', pa.date32()), +]) +LINEITEM_TABLE = """ +CREATE TABLE lineitem( + l_orderkey INTEGER NOT NULL, + l_partkey INTEGER NOT NULL, + l_suppkey INTEGER NOT NULL, + l_linenumber INTEGER NOT NULL, + l_quantity DOUBLE NOT NULL, + l_extendedprice DOUBLE NOT NULL, + l_discount DOUBLE NOT NULL, + l_tax DOUBLE NOT NULL, + l_returnflag VARCHAR NOT NULL, + l_linestatus VARCHAR NOT NULL, + l_shipdate DATE NOT NULL, + l_commitdate DATE NOT NULL, + l_receiptdate DATE NOT NULL, + l_shipinstruct VARCHAR NOT NULL, + l_shipmode VARCHAR NOT NULL, + l_comment VARCHAR NOT NULL); +""" +LINEITEM_PA_SCHEMA = pa.schema([ + pa.field('l_orderkey', pa.int32()), + pa.field('l_partkey', pa.int32()), + pa.field('l_suppkey', pa.int32()), + pa.field('l_linenumber', pa.int32()), + pa.field('l_quantity', pa.float64()), + pa.field('l_extendedprice', pa.float64()), + pa.field('l_discount', pa.float64()), + pa.field('l_tax', pa.float64()), + pa.field('l_returnflag', pa.string()), + pa.field('l_linestatus', pa.string()), + pa.field('l_shipdate', pa.date32()), + pa.field('l_commitdate', pa.date32()), + pa.field('l_receiptdate', pa.date32()), + pa.field('l_shipinstruct', pa.string()), + pa.field('l_shipmode', pa.string()), + pa.field('l_comment', pa.string()), +]) +PARTSUPP_TABLE = """ +CREATE TABLE partsupp( + ps_partkey INTEGER NOT NULL, + ps_suppkey INTEGER NOT NULL, + ps_availqty INTEGER NOT NULL, + ps_supplycost DOUBLE NOT NULL, + ps_comment VARCHAR NOT NULL); +""" +PARTSUPP_PA_SCHEMA = pa.schema([ + pa.field('ps_partkey', pa.int32()), + pa.field('ps_suppkey', pa.int32()), + pa.field('ps_availqty', pa.int32()), + pa.field('ps_supplycost', pa.float64()), + pa.field('ps_comment', pa.string()), +]) +PART_TABLE = """ +CREATE TABLE part( + p_partkey INTEGER NOT NULL, + p_name VARCHAR NOT NULL, + p_mfgr VARCHAR NOT NULL, + p_brand VARCHAR NOT NULL, + p_type VARCHAR NOT NULL, + p_size INTEGER NOT NULL, + p_container VARCHAR NOT NULL, + p_retailprice DOUBLE NOT NULL, + p_comment VARCHAR NOT NULL); +""" +PART_PA_SCHEMA = pa.schema([ + pa.field('ps_partkey', pa.int32()), + pa.field('p_name', pa.string()), + pa.field('p_mfgr', pa.string()), + pa.field('p_brand', pa.string()), + pa.field('p_type', pa.string()), + pa.field('p_size', pa.int32()), + pa.field('p_container', pa.string()), + pa.field('p_retailprice', pa.float64()), + pa.field('p_comment', pa.string()), +]) +CUSTOMER_TABLE = """ +CREATE TABLE customer( + c_custkey INTEGER NOT NULL, + c_name VARCHAR NOT NULL, + c_address VARCHAR NOT NULL, + c_nationkey INTEGER NOT NULL, + c_phone VARCHAR NOT NULL, + c_acctbal DOUBLE NOT NULL, + c_mktsegment VARCHAR NOT NULL, + c_comment VARCHAR NOT NULL); +""" +CUSTOMER_PA_SCHEMA = pa.schema([ + pa.field('c_custkey', pa.int32()), + pa.field('c_name', pa.string()), + pa.field('c_address', pa.string()), + pa.field('c_nationkey', pa.int32()), + pa.field('c_phone', pa.string()), + pa.field('c_acctbal', pa.float64()), + pa.field('c_mktsegment', pa.string()), + pa.field('c_comment', pa.string()), +]) +SUPPLIER_TABLE = """ + CREATE TABLE supplier( + s_suppkey INTEGER NOT NULL, + s_name VARCHAR NOT NULL, + s_address VARCHAR NOT NULL, + s_nationkey INTEGER NOT NULL, + s_phone VARCHAR NOT NULL, + s_acctbal DOUBLE NOT NULL, + s_comment VARCHAR NOT NULL); +""" +CUSTOMER_PA_SCHEMA = pa.schema([ + pa.field('s_suppkey', pa.int32()), + pa.field('s_name', pa.string()), + pa.field('s_address', pa.string()), + pa.field('s_nationkey', pa.int32()), + pa.field('s_phone', pa.string()), + pa.field('s_acctbal', pa.float64()), + pa.field('c_comment', pa.string()), +]) +TABLE_TO_RECREATE = { + "orders": ORDERS_TABLE, + "lineitem": LINEITEM_TABLE, + "partsupp": PARTSUPP_TABLE, + "part": PART_TABLE, + "customer": CUSTOMER_TABLE, + "supplier": SUPPLIER_TABLE, +} +PA_SCHEMA = { + "orders": ORDERS_PA_SCHEMA, + "lineitem": LINEITEM_PA_SCHEMA, + "partsupp": PARTSUPP_PA_SCHEMA, + "part": PART_PA_SCHEMA, + "customer": CUSTOMER_PA_SCHEMA, + "supplier": PARTSUPP_PA_SCHEMA, + "nation": None, + "region": None +}