Skip to content

Commit

Permalink
Worked on dialects for the insert query, tested on sqlite3, added col…
Browse files Browse the repository at this point in the history
…umns to fetch method on query
  • Loading branch information
Pablo Minue committed Jul 7, 2024
1 parent 53be34a commit 82120d9
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 66 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/.pytest_cache
__pycache__
.pyc
testings.py
*.pyc
testings.py
*.sqlite3
Binary file added dist/pysqltools-0.2.6-py3-none-any.whl
Binary file not shown.
Binary file added dist/pysqltools-0.2.6.tar.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pysqltools"
version = "0.2.5"
version = "0.2.6"
description = "PySQLTools"
authors = ["Pablo Minué"]
license = "None"
Expand Down
8 changes: 6 additions & 2 deletions pysqltools/src/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def execute(self, sql: Query) -> None:
self.conn.commit()
except:
lg.log.warning("Connection can not be commited")
except:
raise ConnectionException
except Exception as e:
raise ConnectionException(e)

def fetch(self, sql: Query, dataframe: bool = False):
"""
Expand All @@ -84,7 +84,11 @@ def fetch(self, sql: Query, dataframe: bool = False):
else:
cursor = self.conn.cursor()
cursor.execute(sql.sql)
if hasattr(cursor, "description"):
columns = [i[0] for i in cursor.description]
if hasattr(cursor, "fetchall"):
if dataframe:
return pd.DataFrame(cursor.fetchall(), columns=columns)
return cursor.fetchall()
else:
rows = []
Expand Down
1 change: 1 addition & 0 deletions pysqltools/src/sql/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
"float64": "double",
"bool": "bool",
"datetime64": "timestamp",
"datetime64[ns]": "timestamp",
}
218 changes: 165 additions & 53 deletions pysqltools/src/sql/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,59 +12,159 @@
lg = PabLog("Insert")


def prepare_value(val: Any) -> Any:
def prepare_value(val: Any, dialect: str) -> Any:
"""
Format value from Python types to SQL Types
"""
if isinstance(val, bool):
if dialect.lower().__contains__("trino"):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, bool):
return val
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = "TIMESTAMP '" + str(val) + "'"
if isinstance(val, date):
val = "DATE '" + str(val) + "'"
if isinstance(val, list):
val = "ARRAY " + str(val)
if isinstance(val, float):
val = "DOUBLE '" + str(val) + "'"
if isinstance(val, int):
val = "INT '" + str(val) + "'"
if pd.isnull(val):
val = "NULL"

try:
if (
"'" in val
and "DOUBLE" not in val
and "INT" not in val
and "TIMESTAMP" not in val
and "DATE" not in val
):
val = val.replace("'", "''")
except TypeError:
lg.log.warning("Not Adding Quotes")

return val
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = "TIMESTAMP '" + str(val) + "'"
if isinstance(val, date):
val = "DATE '" + str(val) + "'"
if isinstance(val, list):
val = "ARRAY " + str(val)
if isinstance(val, float):
val = "DOUBLE '" + str(val) + "'"
if isinstance(val, int):
val = "INT '" + str(val) + "'"
if pd.isnull(val):
val = "NULL"

try:
if (
"'" in val
and "DOUBLE" not in val
and "INT" not in val
and "TIMESTAMP" not in val
and "DATE" not in val
):
val = val.replace("'", "''")
except TypeError:
lg.log.warning("Not Adding Quotes")
if dialect.lower().__contains__("mysql"):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, bool):
return bool(val)
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = f"'{val}'"
if isinstance(val, date):
val = f"'{val}'"
if isinstance(val, list):
val = f"'{str(val)}'"
if isinstance(val, float):
val = str(val)
if isinstance(val, int):
val = str(val)
if pd.isnull(val):
val = "NULL"

return val

if dialect.lower().__contains__("sqlite"):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, bool):
return bool(val)
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = f"'{val}'"
if isinstance(val, date):
val = f"'{val}'"
if isinstance(val, list):
val = f"'{str(val)}'"
if isinstance(val, float):
val = str(val)
if isinstance(val, int):
val = str(val)
if pd.isnull(val):
val = "NULL"

return val
return val

if dialect.lower().__contains__("ibm"):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, bool):
return bool(val)
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = f"TIMESTAMP '{val}'"
if isinstance(val, date):
val = f"DATE '{val}'"
if isinstance(val, list):
val = f"'{str(val)}'"
if isinstance(val, float):
val = str(val)
if isinstance(val, int):
val = str(val)
if pd.isnull(val):
val = "NULL"

def join_values(data: list[Any]) -> str:
return val

if dialect.lower().__contains__("sqlserver"):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, bool):
return bool(val)
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = f"'{val}'"
if isinstance(val, date):
val = f"'{val}'"
if isinstance(val, list):
val = f"'{str(val)}'"
if isinstance(val, float):
val = str(val)
if isinstance(val, int):
val = str(val)
if pd.isnull(val):
val = "NULL"

return val

if dialect.lower().__contains__("mariadb"):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, bool):
return bool(val)
if isinstance(val, dict):
val = str(val).replace("'", '"')
if isinstance(val, pd.Timestamp):
val = f"'{val}'"
if isinstance(val, date):
val = f"'{val}'"
if isinstance(val, list):
val = f"'{str(val)}'"
if isinstance(val, float):
val = str(val)
if isinstance(val, int):
val = str(val)
if pd.isnull(val):
val = "NULL"
return val


def join_values(data: list[Any], dialect: str) -> str:
"""
Create a String for the VALUES () SQL Syntax
"""
clean_list = []
for val in data:
if isinstance(val, bool):
val = str(val)
if (
isinstance(val, str)
and "DOUBLE" not in val
and "INT" not in val
and "TIMESTAMP" not in val
and "DATE" not in val
and "ARRAY" not in val
) and val.lower() not in ["true", "false"]:
val = "'" + val + "'"
try:
if "NULL" in val:
val = "NULL"
Expand All @@ -76,18 +176,22 @@ def join_values(data: list[Any]) -> str:
return "(" + str_data + ")"


def pandas_to_sql(df: pd.DataFrame) -> Generator[str, None, None]:
def pandas_to_sql(df: pd.DataFrame, dialect: str) -> Generator[str, None, None]:
"""
Generator to get one row insert statement
"""
for row in df.values:
data_list = [prepare_value(x) for x in row]
data_string = join_values(data_list)
data_list = [prepare_value(x, dialect=dialect) for x in row]
data_string = join_values(data_list, dialect=dialect)
yield data_string


def generate_insert_query(
df: pd.DataFrame, table: str = None, schema: str = None, batch_size: int = 5000
df: pd.DataFrame,
table: str = None,
schema: str = None,
batch_size: int = 5000,
dialect: str = "trino",
) -> Generator[Query, None, None]:
if df.empty:
raise TypeError("DataFrame can not be empty")
Expand All @@ -96,7 +200,7 @@ def generate_insert_query(
percentage = round(100 * previous_iter / len(df), 2)
lg.log.info("Generating Insert Queries... %s", percentage)
batch = df.iloc[previous_iter : previous_iter + batch_size]
data_points = list(pandas_to_sql(batch))
data_points = list(pandas_to_sql(batch, dialect))
data_points_string = ",".join(data_points)
if schema and table:
table = f"{schema}.{table}"
Expand All @@ -111,21 +215,29 @@ def generate_insert_query(
def insert_pandas(
df: pd.DataFrame,
connection: SQLConnection,
table: str,
schema: str,
batch_size: int,
table: str,
schema: str = "",
dialect: str = "trino",
):
if not table and schema:
raise TypeError("Table and Schema need to be provided")
with Progress() as progress:
iterations = len(df) / batch_size
task1 = progress.add_task("[red]Generating Queries...", total=1000)
task2 = progress.add_task("[green]Inserting Data...", total=iterations)
task3 = progress.add_task("[cyan]Finishing...", total=1000)
task1 = progress.add_task("[red] Generating Queries...", total=1000)
task2 = progress.add_task("[green] Inserting Data...", total=iterations)
task3 = progress.add_task("[cyan] Finishing...", total=1000)
for _ in range(1000):
progress.update(task1, advance=1.0)
for query in generate_insert_query(df, table, schema, batch_size):
connection.execute(query)
for query in generate_insert_query(
df, table, schema, batch_size, dialect=dialect
):
try:
connection.execute(query)
except Exception as e:
lg.log.warning("Query Execution Failed")
lg.log.error(e)
print(query.sql)
progress.update(task2, advance=1)
for i in range(1000):
progress.update(task3, advance=1.0)
18 changes: 14 additions & 4 deletions pysqltools/src/sql/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pandas as pd
import sqlparse

from pysqltools.src.sql.query import Query

from .constants import TYPE_MAPPING
from .insert import insert_pandas

Expand All @@ -22,7 +24,11 @@ def __init__(self, table: str, schema: Union[str, None] = None) -> None:
self.table = f"{schema}.{table}"

def create_from_df(
self, df: pd.DataFrame, insert_data: bool = False, **insert_kwargs: Any
self,
df: pd.DataFrame,
execute: bool = False,
insert_data: bool = False,
**insert_kwargs: Any,
) -> str:
"""
Get the SQL statement to create a SQL table based on a Pandas DataFrame. If the insert_data argument is set to True,
Expand All @@ -41,12 +47,14 @@ def create_from_df(
),
)
)
sql = f"CREATE TABLE {self.table} ( "
sql = f"CREATE TABLE IF NOT EXISTS {self.table} ( "
for k, v in columns.items():
sql += f"{k} {v}, "
sql = sql[:-2] + " )"
if not insert_data:
return sqlparse.format(sql, encoding="utf-8")
if execute:
insert_kwargs["connection"].execute(Query(sql))
if "batch_size" in insert_kwargs:
batch_size = insert_kwargs["batch_size"]
else:
Expand All @@ -57,8 +65,10 @@ def create_from_df(
connection=insert_kwargs["connection"],
table=self.table,
batch_size=batch_size,
dialect=insert_kwargs["dialect"],
)
except TypeError:
except TypeError as e:
raise TypeError(
"Please include the insert arguments into the create_table_from_df method"
"Please include the insert arguments into the create_table_from_df method",
e,
)
Empty file removed pysqltools/¡
Empty file.
Loading

0 comments on commit 82120d9

Please sign in to comment.