Skip to content

Commit

Permalink
Parametred Update/Delete support. (#177)
Browse files Browse the repository at this point in the history
Progress on #81

This PR adds capability to allow parameterd queries, and then connects it up for Updates(.save()) and Deletes.
(Not queryset updates, that is a separate issue).

This is a step towards hardening Tortoise ORM. It also SIGNIFICANTLY improves performance for full Updates, partial Updates and Deletes.
  • Loading branch information
grigi authored Sep 1, 2019
1 parent ea91a71 commit 0e2cf36
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 67 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

Changelog
=========
0.13.2
------
* Security fixes for ``«model».save()`` & ``«model».dete()``:

This is now fully parametrized, and these operations are no longer susceptible to escaping issues.

- Simple update is now ~3-6× faster
- Partial update is now ~3× faster
- Delete is now ~2.7x faster

0.13.1
------
* Model schema now has a discovery API:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Hence we started Tortoise ORM.

Tortoise ORM is designed to be functional, yet familiar, to ease the migration of developers wishing to switch to ``asyncio``.

It also performs well when compared to other Python ORMS, only ever losing to Pony ORM:
It also performs well when compared to other Python ORMs, only losing to Pony ORM:

.. image:: docs/ORM_Perf.png
:target: https://github.com/tortoise/orm-benchmarks
Expand Down
Binary file modified docs/ORM_Perf.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Hence we started Tortoise ORM.

Tortoise ORM is designed to be functional, yet familiar, to ease the migration of developers wishing to switch to ``asyncio``.

It also performs well when compared to other Python ORMS, only ever losing to Pony ORM:
It also performs well when compared to other Python ORMs, only losing to Pony ORM:

.. image:: ORM_Perf.png
:target: https://github.com/tortoise/orm-benchmarks
Expand Down
11 changes: 8 additions & 3 deletions tortoise/backends/asyncpg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,15 @@ async def execute_many(self, query: str, values: list) -> None:

@translate_exceptions
@retry_connection
async def execute_query(self, query: str) -> List[dict]:
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug(query)
return await connection.fetch(query)
self.log.debug("%s: %s", query, values)
if values:
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetch(*values)
else:
return await connection.fetch(query)

@translate_exceptions
@retry_connection
Expand Down
5 changes: 4 additions & 1 deletion tortoise/backends/asyncpg/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
class AsyncpgExecutor(BaseExecutor):
EXPLAIN_PREFIX = "EXPLAIN (FORMAT JSON, VERBOSE)"

def Parameter(self, pos: int) -> Parameter:
return Parameter("$%d" % (pos + 1,))

def _prepare_insert_statement(self, columns: List[str]) -> str:
query = (
self.db.query_class.into(Table(self.model._meta.table))
.columns(*columns)
.insert(*[Parameter("$%d" % (i + 1,)) for i in range(len(columns))])
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
generated_fields = self.model._meta.generated_db_fields
if generated_fields:
Expand Down
4 changes: 2 additions & 2 deletions tortoise/backends/base/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Sequence
from typing import Any, Optional, Sequence

from pypika import Query

Expand Down Expand Up @@ -90,7 +90,7 @@ def _in_transaction(self) -> "BaseTransactionWrapper":
async def execute_insert(self, query: str, values: list) -> Any:
raise NotImplementedError() # pragma: nocoverage

async def execute_query(self, query: str) -> Sequence[dict]:
async def execute_query(self, query: str, values: Optional[list] = None) -> Sequence[dict]:
raise NotImplementedError() # pragma: nocoverage

async def execute_script(self, query: str) -> None:
Expand Down
105 changes: 69 additions & 36 deletions tortoise/backends/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Type # noqa

from pypika import JoinType, Table
from pypika import JoinType, Parameter, Table

from tortoise import fields
from tortoise.exceptions import OperationalError
Expand All @@ -11,7 +11,7 @@
if TYPE_CHECKING: # pragma: nocoverage
from tortoise.models import Model

INSERT_CACHE = {} # type: Dict[str, Tuple[list, str, Dict[str, Callable]]]
EXECUTOR_CACHE = {} # type: Dict[str, Tuple[list, str, Dict[str, Callable], str, Dict[str, str]]]


class BaseExecutor:
Expand All @@ -26,9 +26,9 @@ def __init__(self, model, db=None, prefetch_map=None, prefetch_queries=None):
self._prefetch_queries = prefetch_queries if prefetch_queries else {}

key = "{}:{}".format(self.db.connection_name, self.model._meta.table)
if key not in INSERT_CACHE:
if key not in EXECUTOR_CACHE:
self.regular_columns, columns = self._prepare_insert_columns()
self.query = self._prepare_insert_statement(columns)
self.insert_query = self._prepare_insert_statement(columns)

self.column_map = {} # type: Dict[str, Callable]
for column in self.regular_columns:
Expand All @@ -39,9 +39,29 @@ def __init__(self, model, db=None, prefetch_map=None, prefetch_queries=None):
func = field_object.to_db_value
self.column_map[column] = func

INSERT_CACHE[key] = self.regular_columns, self.query, self.column_map
table = Table(self.model._meta.table)
self.delete_query = str(
self.model._meta.basequery.where(
getattr(table, self.model._meta.db_pk_field) == self.Parameter(0)
).delete()
)
self.update_cache = {} # type: Dict[str, str]

EXECUTOR_CACHE[key] = (
self.regular_columns,
self.insert_query,
self.column_map,
self.delete_query,
self.update_cache,
)
else:
self.regular_columns, self.query, self.column_map = INSERT_CACHE[key]
(
self.regular_columns,
self.insert_query,
self.column_map,
self.delete_query,
self.update_cache,
) = EXECUTOR_CACHE[key]

async def execute_explain(self, query) -> Any:
sql = " ".join((self.EXPLAIN_PREFIX, query.get_sql()))
Expand Down Expand Up @@ -78,17 +98,24 @@ def _prepare_insert_statement(self, columns: List[str]) -> str:
# Insert should implement returning new id to saved object
# Each db has it's own methods for it, so each implementation should
# go to descendant executors
raise NotImplementedError() # pragma: nocoverage
return str(
self.db.query_class.into(Table(self.model._meta.table))
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)

async def _process_insert_result(self, instance: "Model", results: Any):
raise NotImplementedError() # pragma: nocoverage

def Parameter(self, pos: int) -> Parameter:
raise NotImplementedError() # pragma: nocoverage

async def execute_insert(self, instance):
values = [
self.column_map[column](getattr(instance, column), instance)
for column in self.regular_columns
]
insert_result = await self.db.execute_insert(self.query, values)
insert_result = await self.db.execute_insert(self.insert_query, values)
await self._process_insert_result(instance, insert_result)

async def execute_bulk_insert(self, instances):
Expand All @@ -99,39 +126,45 @@ async def execute_bulk_insert(self, instances):
]
for instance in instances
]
await self.db.execute_many(self.query, values_lists)
await self.db.execute_many(self.insert_query, values_lists)

def get_update_sql(self, update_fields: Optional[List[str]]) -> str:
"""
Generates the SQL for updating a model depending on provided update_fields.
Result is cached for performance.
"""
key = ",".join(update_fields) if update_fields else ""
if key in self.update_cache:
return self.update_cache[key]

async def execute_update(self, instance, update_fields):
table = Table(self.model._meta.table)
query = self.db.query_class.update(table)
if update_fields:
for field in update_fields:
db_field = self.model._meta.fields_db_projection[field]
field_object = self.model._meta.fields_map[field]
if not field_object.generated:
query = query.set(
db_field, self.column_map[field](getattr(instance, field), instance)
)
else:
for field, db_field in self.model._meta.fields_db_projection.items():
field_object = self.model._meta.fields_map[field]
if not field_object.generated:
query = query.set(
db_field, self.column_map[field](getattr(instance, field), instance)
)
query = query.where(
getattr(table, self.model._meta.db_pk_field)
== self.model._meta.pk.to_db_value(instance.pk, instance)
)
await self.db.execute_query(query.get_sql())
count = 0
for field in update_fields or self.model._meta.fields_db_projection.keys():
db_field = self.model._meta.fields_db_projection[field]
field_object = self.model._meta.fields_map[field]
if not field_object.pk:
query = query.set(db_field, self.Parameter(count))
count += 1

query = query.where(getattr(table, self.model._meta.db_pk_field) == self.Parameter(count))

sql = self.update_cache[key] = query.get_sql()
return sql

async def execute_update(self, instance, update_fields: Optional[List[str]]) -> None:
values = [
self.column_map[field](getattr(instance, field), instance)
for field in update_fields or self.model._meta.fields_db_projection.keys()
if not self.model._meta.fields_map[field].pk
]
values.append(self.model._meta.pk.to_db_value(instance.pk, instance))
await self.db.execute_query(self.get_update_sql(update_fields), values)

async def execute_delete(self, instance):
table = Table(self.model._meta.table)
query = self.model._meta.basequery.where(
getattr(table, self.model._meta.db_pk_field)
== self.model._meta.pk.to_db_value(instance.pk, instance)
).delete()
await self.db.execute_query(query.get_sql())
await self.db.execute_query(
self.delete_query, [self.model._meta.pk.to_db_value(instance.pk, instance)]
)
return instance

async def _prefetch_reverse_relation(
Expand Down
8 changes: 5 additions & 3 deletions tortoise/backends/mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,13 @@ async def execute_many(self, query: str, values: list) -> None:

@translate_exceptions
@retry_connection
async def execute_query(self, query: str) -> List[aiomysql.DictCursor]:
async def execute_query(
self, query: str, values: Optional[list] = None
) -> List[aiomysql.DictCursor]:
async with self.acquire_connection() as connection:
self.log.debug(query)
self.log.debug("%s: %s", query, values)
async with connection.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query)
await cursor.execute(query, values)
return await cursor.fetchall()

@translate_exceptions
Expand Down
12 changes: 3 additions & 9 deletions tortoise/backends/mysql/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import List

from pypika import MySQLQuery, Parameter, Table, functions
from pypika import Parameter, functions
from pypika.enums import SqlTypes

from tortoise import Model
Expand Down Expand Up @@ -57,12 +55,8 @@ class MySQLExecutor(BaseExecutor):
}
EXPLAIN_PREFIX = "EXPLAIN FORMAT=JSON"

def _prepare_insert_statement(self, columns: List[str]) -> str:
return str(
MySQLQuery.into(Table(self.model._meta.table))
.columns(*columns)
.insert(*[Parameter("%s") for _ in range(len(columns))])
)
def Parameter(self, pos: int) -> Parameter:
return Parameter("%s")

async def _process_insert_result(self, instance: Model, results: int):
pk_field_object = self.model._meta.pk
Expand Down
6 changes: 3 additions & 3 deletions tortoise/backends/sqlite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ async def execute_many(self, query: str, values: List[list]) -> None:
await connection.executemany(query, values)

@translate_exceptions
async def execute_query(self, query: str) -> List[dict]:
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug(query)
res = [dict(row) for row in await connection.execute_fetchall(query)]
self.log.debug("%s: %s", query, values)
res = [dict(row) for row in await connection.execute_fetchall(query, values)]
return res

@translate_exceptions
Expand Down
12 changes: 4 additions & 8 deletions tortoise/backends/sqlite/executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datetime
from decimal import Decimal
from typing import List, Optional
from typing import Optional

from pypika import Parameter, Table
from pypika import Parameter

from tortoise import Model, fields
from tortoise.backends.base.executor import BaseExecutor
Expand Down Expand Up @@ -47,12 +47,8 @@ class SqliteExecutor(BaseExecutor):
}
EXPLAIN_PREFIX = "EXPLAIN QUERY PLAN"

def _prepare_insert_statement(self, columns: List[str]) -> str:
return str(
self.db.query_class.into(Table(self.model._meta.table))
.columns(*columns)
.insert(*[Parameter("?") for _ in range(len(columns))])
)
def Parameter(self, pos: int) -> Parameter:
return Parameter("?")

async def _process_insert_result(self, instance: Model, results: int):
pk_field_object = self.model._meta.pk
Expand Down

0 comments on commit 0e2cf36

Please sign in to comment.