Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Implement option 'truncate' of argument if_exists in DataFrame.to_sql #59391

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Other enhancements
- :meth:`DataFrame.agg` called with ``axis=1`` and a ``func`` which relabels the result index now raises a ``NotImplementedError`` (:issue:`58807`).
- :meth:`Index.get_loc` now accepts also subclasses of ``tuple`` as keys (:issue:`57922`)
- :meth:`Styler.set_tooltips` provides alternative method to storing tooltips by using title attribute of td elements. (:issue:`56981`)
- Added ``"truncate"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` truncating the table before inserting data (:issue:`37210`).
- Added missing parameter ``weights`` in :meth:`DataFrame.plot.kde` for the estimation of the PDF (:issue:`59337`)
- Allow dictionaries to be passed to :meth:`pandas.Series.str.replace` via ``pat`` parameter (:issue:`51748`)
- Support passing a :class:`Series` input to :func:`json_normalize` that retains the :class:`Series` :class:`Index` (:issue:`51452`)
Expand Down
66 changes: 55 additions & 11 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def to_sql(
name: str,
con,
schema: str | None = None,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "truncate"] = "fail",
index: bool = True,
index_label: IndexLabel | None = None,
chunksize: int | None = None,
Expand All @@ -762,10 +762,12 @@ def to_sql(
schema : str, optional
Name of SQL schema in database to write to (if database flavor
supports this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'truncate'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- truncate: If table exists, truncate it. Create if does not exist.
Raises NotImplementedError if 'TRUNCATE TABLE' is not supported
index : bool, default True
Write DataFrame index as a column.
index_label : str or sequence, optional
Expand Down Expand Up @@ -816,7 +818,7 @@ def to_sql(
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
""" # noqa: E501
if if_exists not in ("fail", "replace", "append"):
if if_exists not in ("fail", "replace", "append", "truncate"):
raise ValueError(f"'{if_exists}' is not valid for if_exists")

if isinstance(frame, Series):
Expand Down Expand Up @@ -924,7 +926,7 @@ def __init__(
pandas_sql_engine,
frame=None,
index: bool | str | list[str] | None = True,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "truncate"] = "fail",
prefix: str = "pandas",
index_label=None,
schema=None,
Expand Down Expand Up @@ -972,11 +974,13 @@ def create(self) -> None:
if self.exists():
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
elif self.if_exists == "replace":
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
elif self.if_exists == "truncate":
self.pd_sql.truncate_table(self.name, self.schema)
else:
raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
else:
Expand Down Expand Up @@ -1468,7 +1472,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "truncate"] = "fail",
index: bool = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1854,7 +1858,7 @@ def prep_table(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "truncate"] = "fail",
index: bool | str | list[str] | None = True,
index_label=None,
schema=None,
Expand Down Expand Up @@ -1931,7 +1935,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "truncate"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -1949,10 +1953,12 @@ def to_sql(
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
if_exists : {'fail', 'replace', 'append', 'truncate'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- truncate: If table exists, truncate it. Create if does not exist.
Raises NotImplementedError if 'TRUNCATE TABLE' is not supported
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand Down Expand Up @@ -2049,6 +2055,26 @@ def drop_table(self, table_name: str, schema: str | None = None) -> None:
self.get_table(table_name, schema).drop(bind=self.con)
self.meta.clear()

def truncate_table(self, table_name: str, schema: str | None = None) -> None:
from sqlalchemy.exc import OperationalError

schema = schema or self.meta.schema

if self.has_table(table_name, schema):
self.meta.reflect(
bind=self.con, only=[table_name], schema=schema, views=True
)
with self.run_transaction():
table = self.get_table(table_name, schema)
try:
self.execute(f"TRUNCATE TABLE {table.name}")
except OperationalError as exc:
raise NotImplementedError(
"'TRUNCATE TABLE' is not supported by this database."
) from exc

self.meta.clear()

def _create_sql_schema(
self,
frame: DataFrame,
Expand Down Expand Up @@ -2306,7 +2332,7 @@ def to_sql(
self,
frame,
name: str,
if_exists: Literal["fail", "replace", "append"] = "fail",
if_exists: Literal["fail", "replace", "append", "truncate"] = "fail",
index: bool = True,
index_label=None,
schema: str | None = None,
Expand All @@ -2328,6 +2354,8 @@ def to_sql(
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- truncate: If table exists, truncate it. Create if does not exist.
Raises NotImplementedError if 'TRUNCATE TABLE' is not supported
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Expand All @@ -2345,6 +2373,8 @@ def to_sql(
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""
from adbc_driver_manager import ProgrammingError

if index_label:
raise NotImplementedError(
"'index_label' is not implemented for ADBC drivers"
Expand Down Expand Up @@ -2378,6 +2408,15 @@ def to_sql(
cur.execute(f"DROP TABLE {table_name}")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should use mode=replace instead of running a manual DROP TABLE statement. I can have other PR for this patch if it makes sense...

elif if_exists == "append":
mode = "append"
elif if_exists == "truncate":
mode = "append"
with self.con.cursor() as cur:
try:
cur.execute(f"TRUNCATE TABLE {table_name}")
except ProgrammingError as exc:
raise NotImplementedError(
"'TRUNCATE TABLE' is not supported by this database."
) from exc

import pyarrow as pa

Expand Down Expand Up @@ -2779,10 +2818,12 @@ def to_sql(
frame: DataFrame
name: string
Name of SQL table.
if_exists: {'fail', 'replace', 'append'}, default 'fail'
if_exists: {'fail', 'replace', 'append', 'truncate'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if it does not exist.
truncate: If table exists, truncate it. Create if does not exist.
Raises NotImplementedError if 'TRUNCATE TABLE' is not supported
index : bool, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Expand Down Expand Up @@ -2858,6 +2899,9 @@ def drop_table(self, name: str, schema: str | None = None) -> None:
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
self.execute(drop_sql)

def truncate_table(self, name: str, schema: str | None = None) -> None:
raise NotImplementedError("'TRUNCATE TABLE' is not supported by this database.")

def _create_sql_schema(
self,
frame,
Expand Down
60 changes: 58 additions & 2 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,12 +1063,27 @@ def test_to_sql(conn, method, test_frame1, request):


@pytest.mark.parametrize("conn", all_connectable)
@pytest.mark.parametrize("mode, num_row_coef", [("replace", 1), ("append", 2)])
@pytest.mark.parametrize(
"mode, num_row_coef", [("replace", 1), ("append", 2), ("truncate", 1)]
)
def test_to_sql_exist(conn, mode, num_row_coef, test_frame1, request):
connections_without_truncate = sqlite_connectable + [
"sqlite_buildin",
"sqlite_adbc_conn",
]
if conn in connections_without_truncate and mode == "truncate":
context = pytest.raises(
NotImplementedError,
match="'TRUNCATE TABLE' is not supported by this database.",
)
else:
context = contextlib.nullcontext()
conn = request.getfixturevalue(conn)

with pandasSQL_builder(conn, need_transaction=True) as pandasSQL:
pandasSQL.to_sql(test_frame1, "test_frame", if_exists="fail")
pandasSQL.to_sql(test_frame1, "test_frame", if_exists=mode)
with context:
pandasSQL.to_sql(test_frame1, "test_frame", if_exists=mode)
assert pandasSQL.has_table("test_frame")
assert count_rows(conn, "test_frame") == num_row_coef * len(test_frame1)

Expand Down Expand Up @@ -2693,6 +2708,47 @@ def test_drop_table(conn, request):
assert not insp.has_table("temp_frame")


@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable)
def test_truncate_table_success(conn, test_frame1, request):
table_name = "temp_frame"
conn = request.getfixturevalue(conn)

with sql.SQLDatabase(conn) as pandasSQL:
with pandasSQL.run_transaction():
assert pandasSQL.to_sql(test_frame1, table_name, if_exists="replace") == 4

with pandasSQL.run_transaction():
pandasSQL.truncate_table(table_name)
assert count_rows(conn, table_name) == 0


@pytest.mark.parametrize("conn", sqlite_connectable)
def test_truncate_table_not_supported(conn, test_frame1, request):
table_name = "temp_frame"
conn = request.getfixturevalue(conn)

with sql.SQLDatabase(conn) as pandasSQL:
with pandasSQL.run_transaction():
assert pandasSQL.to_sql(test_frame1, table_name, if_exists="replace") == 4

with pandasSQL.run_transaction():
with pytest.raises(
NotImplementedError,
match="'TRUNCATE TABLE' is not supported by this database.",
):
pandasSQL.truncate_table(table_name)
assert count_rows(conn, table_name) == len(test_frame1)


def test_truncate_table_sqlite_not_implemented(sqlite_buildin):
with sql.SQLiteDatabase(sqlite_buildin) as pandasSQL:
with pytest.raises(
NotImplementedError,
match="'TRUNCATE TABLE' is not supported by this database.",
):
pandasSQL.truncate_table("table")


@pytest.mark.parametrize("conn", all_connectable)
def test_roundtrip(conn, request, test_frame1):
if conn == "sqlite_str":
Expand Down
Loading