-
Notifications
You must be signed in to change notification settings - Fork 6
Add bulk operations utilities #224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
app/tests/src/db/test_bulk_ops.py
Outdated
| conn = db_session.connection().connection | ||
| # Override mypy, because SQLAlchemy's DBAPICursor type doesn't specify the row_factory attribute, or that it functions as a context manager | ||
| with conn.cursor(row_factory=rows.class_row(Number)) as cur: # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love to know if there's a better way of doing this. I also considered:
db_client = db.PostgresDBClient()
conn = db_client._engine.raw_connection()
but accessing _engine directly did not feel appropriate (and doesn't solve for the type issue in any case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider adding a raw_connection() method to the client class which does what you suggested. For the docs, mention that unless you're trying to do something very low level (ie. in psycopg) you'll almost never actually want to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this and included a comment with that context -- LMK what you think!
app/tests/src/db/test_bulk_ops.py
Outdated
| conn = db_session.connection().connection | ||
| # Override mypy, because SQLAlchemy's DBAPICursor type doesn't specify the row_factory attribute, or that it functions as a context manager | ||
| with conn.cursor(row_factory=rows.class_row(Number)) as cur: # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, not sure
| # Now modify half of the objects | ||
| for obj in objects[: int(len(objects) / 2)]: | ||
| obj.num = random.randint(1, 10000) | ||
|
|
||
| bulk_ops.bulk_upsert( | ||
| cur, | ||
| table, | ||
| attributes, | ||
| objects, | ||
| constraint, | ||
| ) | ||
| conn.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it'd be nice to have the test case do a combination of inserts and updates rather than just inserts and updates separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added -- one round of inserts, then a second round of combo insert + updates
app/src/db/bulk_ops.py
Outdated
| temp_table = f"temp_{table}" | ||
| create_temp_table(cur, temp_table=temp_table, src_table=table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a very niche edge case, but what would happen if two temp tables were created with the same name by different processes? Does that cause any issues, or does them being in the transactions entirely shield them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question, I tested it locally and it looks like the transaction isolation works like you'd expect. Here's the SQL I ran:
CREATE TEMP TABLE test (id INT) ON COMMIT DROP;
SELECT * FROM test;
-- In a separate connection!
BEGIN;
CREATE TEMP TABLE test (other INT) ON COMMIT DROP;
SELECT * FROM test;
COMMIT;
-- Back in the original connection
COMMIT;
app/tests/src/db/test_bulk_ops.py
Outdated
| conn = db_session.connection().connection | ||
| # Override mypy, because SQLAlchemy's DBAPICursor type doesn't specify the row_factory attribute, or that it functions as a context manager | ||
| with conn.cursor(row_factory=rows.class_row(Number)) as cur: # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider adding a raw_connection() method to the client class which does what you suggested. For the docs, mention that unless you're trying to do something very low level (ie. in psycopg) you'll almost never actually want to use it.
lorenyu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great. just a nit on the test, don't feel too strongly about it though
Ticket
n/a
Changes
bulk_ops.py, which exposes abulk_upsertfunction for efficiently upserting large amounts of data into the databaseContext for reviewers
psycopglibrary. Feedback on how to adapt the code here to the platform's approach is welcome/appreciated.Testing
make test args="tests/src/db/test_bulk_ops.py"