-
-
Notifications
You must be signed in to change notification settings - Fork 18.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding (Insert or update if key exists) option to .to_sql
#14553
Comments
This would be nice functionality, but the main problem is that we want it to be database-flavor independent and based on sqlalchemy core (so not sqlalchemy ORM) for inclusion in pandas itself. |
Yeah, I think this is out of scope for pandas since upserts aren't supported by all db engines. |
While an |
@TomAugspurger Could we add the upsert option for supported db engines and throw an error for unsupported db engines ? |
I'd like to see this as well. I am caught in between using pure SQL and SQL Alchemy (haven't gotten this to work yet, I think it has something to do with how I pass the dicts). I use psycopg2 COPY to bulk insert, but I would love to use pd.to_sql for tables where values might change over time and I don't mind it inserting a bit slower.
And pure SQL:
|
@ldacey this style worked for me (insert_statement.excluded is an alias to the row of data that violated the constraint):
|
@cdagnino This snippet might not work in the case of composite keys, that scenario has to be taken care of also. I'll try to find a way to do the same |
Pandas DataFrame.to_sql method has limitation of not being able to "insert or replace" records, see e.g: pandas-dev/pandas#14553 Using pandas.io.sql primitives, however, it's not too hard to implement such a functionality (for the SQLite case only). Assuming that index columns of the frame have names, this method will use those columns as the PRIMARY KEY of the table. Bug: catapult:#4382 Change-Id: I5f70a04c18b998590b1e77e5ff6b89b2d27138af Reviewed-on: https://chromium-review.googlesource.com/1035266 Commit-Queue: Juan Antonio Navarro Pérez <perezju@chromium.org> Reviewed-by: Charlie Andrews <charliea@chromium.org>
One way to solve this update issue is to use sqlachemy's bulk_update_mappings. This function takes in a list of dictionary values and updates each row based on the tables primary key. session.bulk_update_mappings(
Table,
pandas_df.to_dict(orient='records)
) |
I agree with @neilfrndes, shouldn't allow a nice feature like this not to be implemented because some DBs don't support. Is there any chance this feature might happen? |
Probably. if someone makes a PR. On further consideration, I don't think I'm opposed to this on the principle that some databases don't support it. However, I'm not too familiar with the sql code, so I'm not sure what the best approach is. |
One possibility is to provide some examples for upserts using the For postgres that would look something like (untested): from sqlalchemy.dialects import postgresql
def pg_upsert(table, conn, keys, data_iter):
for row in data:
row_dict = dict(zip(keys, row))
stmt = postgresql.insert(table).values(**row_dict)
upsert_stmt = stmt.on_conflict_do_update(
index_elements=table.index,
set_=row_dict)
conn.execute(upsert_stmt) Something similar could be done for mysql. |
For postgres I am using execute_values. In my case, my query is a jinja2 template to flag whether I should do update set or do nothing. This has been quite quick and flexible. Not as fast as using COPY or copy_expert but it works well.
|
@danich1 can you, please, set an example of how this would work? I tried to have a look into bulk_update_mappings but I got really lost and couldn't make it to work. |
@cristianionescu92 An example would be this:
I have a pandas data frame with the same columns but updated values:
Let's also assume that we have a session variable open to access the database. By calling this method: session.bulk_update_mappings(
User,
<pandas dataframe above>.to_dict(orient='records')
) Pandas will convert the table into a list of dictionaries [{id: 0, name: "chris"}, {id: 1, name:"james"}] that sql will use to update the rows of the table. So final table will look like:
|
Hi, @danich1 and thanks a lot for your response. I figured out myself the mechanics of how the update would work. Unfortunately I don't know work how to work with a session, I am quite beginner. Let me show you what I am doing: ` import pypyodbc
The above code is basically excluding from a dataframe the rows which I already have in SQL and only inserts the new rows. What I need is to update the rows which exists. Can you, please, help me understand what I should do next? |
Motivation for a better TO_SQL
The alternative I've seen in more experienced users is to stop using pandas at this stage, and this tends to propagate upstream and makes the pandas package loose retention among experienced users. Is this the direction Pandas wants to go? I understand we want to_sql to remain database agnostic as much as possible, and use core sql alchemy. A method that truncates or deletes instead of a true upsert would still add a lot of value though. Integration with Pandas product vision I'd happily contribute to either the core pandas functionality, or failing that, documentation on solution / best practice on how to achieve an upsert functionality within Pandas, such as the below: What is the preferred way forward for Pandas core dev / product managers? |
I think we're open to an implementation that's engine-specific. The proposal to use |
I have a similar requirement where I want to update existing data in a MySQL table from multiple CSVs over time. I thought I could df.to_sql() to insert the new data into a newly created temporary table and then run a MySQL query to control how to append/update data in the existing table. MySQL Reference: https://stackoverflow.com/questions/2472229/insert-into-select-from-on-duplicate-key-update?answertab=active#tab-top Disclaimer: I started using Python and Pandas only a few days ago. |
Hey Pandas folk: I have had this same issue, needing to frequently update my local database with records that I ultimately load and manipulate in pandas. I built a simple library to do this - it’s basically a stand-in for df.to_sql and pd.read_sql_table that uses the DataFrame index as a primary key by default. Uses sqlalchemy core only. https://pypi.org/project/pandabase/0.2.1/ This tool is fairly opinionated, probably not appropriate to include in Pandas as-is. But for my specific use case it solves the problem... if there is interest in massaging this to make it fit in Pandas I am happy to help. For now, the following works (in the limited case of current-ish pandas and sqlalchemy, named index as primary key, SQLite or Postgres back end, and supported datatypes): pip install pandabase / pandabase.to_sql(df, table_name, con_string, how=‘upsert’) |
Working on a general solution to this with cvonsteg. Planning to come back with a proposed design in October. |
@TomAugspurger as suggested, @rugg2 and I have come up with the following design proposal for an Interface Proposal2 new variables to be added as a possible
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine("connection string")
df = pd.DataFrame(...)
df.to_sql(
name='table_name',
con=engine,
if_exists='append',
method='upsert_update' # (or upsert_ignore)
) Implementation ProposalTo implement this, def insert(self, chunksize=None, method=None):
#set insert method
if method is None:
exec_insert = self._execute_insert
elif method == "multi":
exec_insert = self.execute_insert_multi
#new upsert methods <<<
elif method == "upsert_update":
exec_insert = self.execute_upsert_update
elif method == "upsert_ignore":
exec_insert = self.execute_upsert_ignore
# >>>
elif callable(method):
exec_inset = partial(method, self)
else:
raise ValueError("Invalid parameter 'method': {}".format(method))
... We propose the following implementation, with rationale outlined in detail below (all points are open for discussion): (1) Engine agnostic using SQLAlchemy core, via an atomic sequence of
|
@TomAugspurger, if the Let us know if you would like to proceed differently. |
Reading through the proposal is on my todo list. I'm a bit behind on my
email right now.
…On Wed, Oct 9, 2019 at 9:18 AM Romain ***@***.***> wrote:
@TomAugspurger <https://github.com/TomAugspurger>, if the design we
designed with @cvonsteg <https://github.com/cvonsteg> suits you, we will
proceed with the implementation in code (incl. tests) and raise a pull
request.
Let us know if you would like to proceed differently.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#14553?email_source=notifications&email_token=AAKAOITBNTWOQRBW3OWDEZDQNXR25A5CNFSM4CU2M7O2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEAYBJ7A#issuecomment-540022012>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAKAOIRZQEQWUY36PQ36QTLQNXR25ANCNFSM4CU2M7OQ>
.
|
I personally don't have anything against it so think a PR is welcome. One implementation across all DBMs using SQLAlchemy core is certainly how this should start if I am reading your points correctly, and same with just primary keys. Always easier to start small and focused and expand from there |
need this feature badly. |
Any news?)) Coming from Java world, never thought this simple functionality might turned my codebase upside down. |
Hi everyone, I've looked into how upserts are implemented in SQL across dialects and found a number of techniques that can inform design decisions here. But first, I want to warn against using DELETE ... INSERT logic. If there are foreign keys or triggers, other records across the database will end up being deleted or otherwise messed up. In MySQL, REPLACE does the same damage. I've actually created hours of work for myself fixing data because I used REPLACE. So, that said, here are the techniques implemented in SQL:
With wildly varying syntax, I understand the temptation to use DELETE ... INSERT to make the implementation dialect agnostic. But there's another way: we can imitate the logic of the MERGE statement using a temp table and basic INSERT and UPDATE statements. The SQL:2016 MERGE syntax is as follows: MERGE INTO target_table
USING source_table
ON search_condition
WHEN MATCHED THEN
UPDATE SET col1 = value1, col2 = value2,...
WHEN NOT MATCHED THEN
INSERT (col1,col2,...)
VALUES (value1,value2,...); Borrowed from Oracle Tutorial Since every dialect supported by SQLAlchemy supports temp tables, a safer, dialect-agnostic approach to doing an upsert would be to, in a single transaction:
Besides being a dialect-agnostic technique, it also has the advantage of being expanded upon by allowing the end-user to choose how to insert or how to update the data as well as on what key to join the data. While the syntax of temp tables, and update joins might differ slightly between dialects, they should be supported everywhere. Below is a proof of concept I wrote for MySQL: import uuid
import pandas as pd
from sqlalchemy import create_engine
# This proof of concept uses this sample database
# https://downloads.mysql.com/docs/world.sql.zip
# Arbitrary, unique temp table name to avoid possible collision
source = str(uuid.uuid4()).split('-')[-1]
# Table we're doing our upsert against
target = 'countrylanguage'
db_url = 'mysql://<{user: }>:<{passwd: }>.@<{host: }>/<{db: }>'
df = pd.read_sql(
f'SELECT * FROM `{target}`;',
db_url
)
# Change for UPDATE, 5.3->5.4
df.at[0,'Percentage'] = 5.4
# Change for INSERT
df = df.append(
{'CountryCode': 'ABW','Language': 'Arabic','IsOfficial': 'F','Percentage':0.0},
ignore_index=True
)
# List of PRIMARY or UNIQUE keys
key = ['CountryCode','Language']
# Do all of this in a single transaction
engine = create_engine(db_url)
with engine.begin() as con:
# Create temp table like target table to stage data for upsert
con.execute(f'CREATE TEMPORARY TABLE `{source}` LIKE `{target}`;')
# Insert dataframe into temp table
df.to_sql(source,con,if_exists='append',index=False,method='multi')
# INSERT where the key doesn't match (new rows)
con.execute(f'''
INSERT INTO `{target}`
SELECT
*
FROM
`{source}`
WHERE
(`{'`, `'.join(key)}`) NOT IN (SELECT `{'`, `'.join(key)}` FROM `{target}`);
''')
# Create a doubled list of tuples of non-key columns to template the update statement
non_key_columns = [(i,i) for i in df.columns if i not in key]
# Whitespace for aesthetics
whitespace = '\n\t\t\t'
# Do an UPDATE ... JOIN to set all non-key columns of target to equal source
con.execute(f'''
UPDATE
`{target}` `t`
JOIN
`{source}` `s` ON `t`.`{"` AND `t`.`".join(["`=`s`.`".join(i) for i in zip(key,key)])}`
SET
`t`.`{f"`,{whitespace}`t`.`".join(["`=`s`.`".join(i) for i in non_key_columns])}`;
''')
# Drop our temp table.
con.execute(f'DROP TABLE `{source}`;') Here, I make the following assumptions:
Despite the assumptions, I hope my MERGE-inspired technique informs efforts to build a flexible, robust upsert option. |
I think this is an useful functionality however out of scope it seems as it is intuitive to have such a common feature while adding rows to a table. |
Please think again to add this function: it is very useful to add rows to an existing table. |
Thanks, @GoldstHa - that is really helpful input. I will attempt to create a POC for the MERGE-like implementation |
Given the issues with the Modified Approach ProposalThere have been some good discussions around the API, and how an upsert should actually be called (i.e. via the
|
Is it be implemented? I am really looking forward to it. |
+1 |
@cvonsteg not sure if this is anyway helpful for your approach atm, but I created a method for ourselves internally (waiting till this is available in pandas). Here's the main class which creates MERGE statement generically from the dataframe itself. Furthermore let me know I can help with anything. class SqlUpsert:
def __init__(self, table_name, schema, id_cols, columns):
self.table_name = table_name
self.schema = schema
self.id_cols = id_cols
self.columns = [col.strip() for col in columns]
def create_on_statement(self):
on = " AND ".join([f"s.{id_col} = t.{id_col}" for id_col in self.id_cols])
return on
def create_update_statement(self):
update = ", ".join(
[f"t.{col} = s.{col}" for col in self.columns if col not in self.id_cols]
)
return update
def create_insert_statement(self):
insert = f"({', '.join(self.columns)})"
values = ", ".join([f"s.{col}" for col in self.columns])
values = f"({values})"
return insert, values
def create_merge_query(self):
insert = self.create_insert_statement()
query = f"""
CREATE PROCEDURE [UPSERT_{self.table_name}]
AS
MERGE {self.schema}.{self.table_name} t
USING staging.{self.table_name} s
ON {self.create_on_statement()}
WHEN MATCHED
THEN UPDATE SET
{self.create_update_statement()}
WHEN NOT MATCHED BY TARGET
THEN INSERT {insert[0]}
VALUES {insert[1]};
"""
logging.info(query)
return query |
Thank you for input @erfannariman - I've been a bit busy moving house, but will get to looking into that asap. |
@cvonsteg, if you need any help, please let me know. |
Is this feature released ? need it badly |
this is open and at least one (closed PR) we would need a complete and fully tested solution |
this feature will greatly help..would appreciate any updates if it is going to be available soon. |
I am subscribing for updates, and to increase the list of people who needs this. |
On behalf of my class in data analysis, as a strong supporter of Pandas, I strongly support this new functionality :). The work around are kind of ugly with different DB. |
Also posting to follow. I can help with the code if there's WIP. |
This would be a great feature! |
Thank you all for the offers of help - the PR is here. I've re-worked it and functionally it should all work now, however the build still fails due to some house-keeping (and seemingly some unrelated tests). Please feel free to have a look and offer any suggestions or feedback which you can think of! |
waiting for ... |
I just developed this package: https://github.com/MayasMess/pandas-oop |
Arriving late to the party. While this PR still waiting to be merged, does anyone have an implementation of the callable method that works with MSSQL? |
@rafagsiqueira I modified code similar to this for PostgreSQL, not sure if it will help, but just passing it along: |
Suppose you have an existing SQL table called
person_age
, whereid
is the primary key:and you also have new data in a
DataFrame
calledextra_data
then it would be useful to have an option on
extra_data.to_sql()
that allows to pass the DataFrame to SQL with anINSERT
orUPDATE
option on the rows, based on theprimary key
.In this case, the
id=2
row would get updated toage=44
and theid=3
row would get addedExpected Output
(Maybe) helpful code references
merge
from SQLAlchemy?I looked at
pandas
sql.py
sourcecode to come up with a solution, but I couldn't follow.Code to replicate the example above
(Apologies for mixing
sqlalchemy
andsqlite
The text was updated successfully, but these errors were encountered: