Skip to content

Commit

Permalink
Version 7.2.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and root committed Sep 18, 2024
1 parent 80ac9cb commit 42a59d9
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 48 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

## Version 7.2

### Version 7.2.1.1 - 2024-09-18

- Made PIVOT & UNPIVOT processors supportive of more input table/query objects


### Version 7.2.1.0 - 2024-09-09

- Initial release, containing many Kinetica-specific features
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
MAJOR = 7
MINOR = 2
REVISION = 1
ABI_VERSION = 0
ABI_VERSION = 1
12 changes: 3 additions & 9 deletions examples/sqlalchemy_api_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -1777,16 +1777,10 @@ def select_unpivot(conn, schema):
customer_contact.c.cell_phone.label('Cell')
)

unpivot_clause = Unpivot(
column = text('phone_number'),
for_column = text('phone_type'),
in_columns = ["Home", "Work", "Cell"]
)

query = (
UnpivotSelect(column("name"), column("phone_type"), column("phone_number"))
.source_query(subquery)
.unpivot(unpivot_clause)
.select_from(subquery)
.unpivot("phone_number", "phone_type", ["Home", "Work", "Cell"])
.order_by(column("name"), column("phone_type"))
)

Expand Down Expand Up @@ -2387,7 +2381,7 @@ def create_schema(conn, schema):
"username": param_user,
"password": param_pass,
"bypass_ssl_cert_check": param_bypass_ssl_cert_check,
},
}
)

with sa_engine.connect() as sa_conn:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "sqlalchemy-kinetica"
version = "7.2.1.0"
version = "7.2.1.1"
dependencies = [
'sqlalchemy>=2.0.31',
'gpudb>=7.2.1.0',
Expand Down
83 changes: 46 additions & 37 deletions sqlalchemy_kinetica/custom_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ def compile_first_value(element, compiler, **kwargs):


class Pivot(ClauseElement):
def __init__(self, agg_fn, for_col, in_list):
self.agg_fn = agg_fn
self.for_col = for_col
self.in_list = in_list
def __init__(self, value_agg_fn, type_column, type_values):
self.value_agg_fn = value_agg_fn
self.type_column = type_column
self.type_values = type_values


# Extend the Select class to support PIVOT
Expand All @@ -159,81 +159,90 @@ class PivotSelect(Select):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._source_alias = None
self._pivot = None

def pivot(self, agg_fn, for_col, in_list):
self._pivot = Pivot(agg_fn, for_col, in_list)
def select_from(self, pivot_source):
self._source_alias = f"p_sq_{randint(0,1000000000)}"
self = super().select_from(pivot_source.alias(self._source_alias))
return self


# Custom compilation for PIVOT clause
@compiles(Pivot, 'default')
def compile_pivot(element, compiler, **kwargs):
pivot_sql = f"PIVOT\n(\n\t{element.agg_fn}\n\tFOR {element.for_col} IN ({', '.join(map(str, element.in_list))})\n)"
return pivot_sql
def pivot(self, value_agg_fn, type_column, type_values):
self._pivot = Pivot(value_agg_fn, type_column, type_values)
return self


# Custom compilation for Select that includes the PIVOT clause
@compiles(PivotSelect)
def compile_pivot_select(element, compiler, **kwargs):

# Compile the inner select first
query = compiler.visit_select(element, **kwargs)

# If there's a pivot clause, inject it
if element._pivot is not None:
gb_index = safe_index(query.lower(),'group by')
ob_index = safe_index(query.lower(),'order by')
ap_index = min(gb_index, ob_index)
sq_alias_clause = f" AS {element._source_alias}"
sq_alias_begin_index = safe_index(query, sq_alias_clause)
sq_alias_end_index = sq_alias_begin_index + len(sq_alias_clause) + 1

query = f"{query[:ap_index]}\n{element._pivot}\n{query[ap_index:]}"
pivot_sql = (
f"PIVOT ({element._pivot.value_agg_fn} "
f"FOR {element._pivot.type_column} "
f"IN ({', '.join(map(str, element._pivot.type_values))}))"
)

query = f"{query[:sq_alias_begin_index]}\n{pivot_sql}\n{query[sq_alias_end_index:]}"

return query


# Custom UNPIVOT clause class
class Unpivot:
def __init__(self, unpivoted_value_column, unpivoted_type_column, value_columns):
self.unpivoted_value_column = unpivoted_value_column
self.unpivoted_type_column = unpivoted_type_column
self.value_columns = value_columns


# Extend the Select class to include the UNPIVOT clause
class UnpivotSelect(Select):
inherit_cache = False

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._source_query_alias = None
self._source_alias = None
self._unpivot = None

def source_query(self, unpivot_subquery):
self._source_query_alias = f"up_sq_{randint(0,1000000000)}"
self = self.select_from(unpivot_subquery.subquery(self._source_query_alias))
def select_from(self, unpivot_source):
self._source_alias = f"up_sq_{randint(0,1000000000)}"
self = super().select_from(unpivot_source.alias(self._source_alias))
return self

def unpivot(self, unpivot_clause):
# Add unpivot clause
self._unpivot = unpivot_clause
def unpivot(self, unpivoted_value_column, unpivoted_type_column, value_columns):
self._unpivot = Unpivot(unpivoted_value_column, unpivoted_type_column, value_columns)
return self


# Custom UNPIVOT clause class
class Unpivot:
def __init__(self, column, for_column, in_columns):
self.column = column
self.for_column = for_column
self.in_columns = in_columns


# Custom SQL compilation for the Unpivot clause
@compiles(UnpivotSelect)
def compile_select_with_unpivot(element, compiler, **kwargs):
def compile_unpivot_select(element, compiler, **kwargs):

# Compile the inner select first
query = compiler.visit_select(element, **kwargs)

# If there's an unpivot clause, append it
# If there's an unpivot clause, inject it
if element._unpivot is not None:
sq_alias_clause = f" AS {element._source_query_alias}"
sq_alias_clause = f" AS {element._source_alias}"
sq_alias_begin_index = safe_index(query, sq_alias_clause)
sq_alias_end_index = sq_alias_begin_index + len(sq_alias_clause) + 1

unpivot_sql = (
f"UNPIVOT ({element._unpivot.column} FOR {element._unpivot.for_column} "
f"IN ({', '.join(element._unpivot.in_columns)}))"
f"UNPIVOT ({element._unpivot.unpivoted_value_column} "
f"FOR {element._unpivot.unpivoted_type_column} "
f"IN ({', '.join(element._unpivot.value_columns)}))"
)
return f"{query[:sq_alias_begin_index]}\n{unpivot_sql}\n{query[sq_alias_end_index:]}"

query = f"{query[:sq_alias_begin_index]}\n{unpivot_sql}\n{query[sq_alias_end_index:]}"

return query

Expand Down

0 comments on commit 42a59d9

Please sign in to comment.