Skip to content

Commit b9372f5

Browse files
committed
Remove non-working pooling, and any reference to single_connection.
1 parent 2e5f68d commit b9372f5

File tree

9 files changed

+32
-188
lines changed

9 files changed

+32
-188
lines changed

tortoise/backends/asyncpg/client.py

Lines changed: 10 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
from tortoise.backends.asyncpg.executor import AsyncpgExecutor
99
from tortoise.backends.asyncpg.schema_generator import AsyncpgSchemaGenerator
1010
from tortoise.backends.base.client import (BaseDBAsyncClient, BaseTransactionWrapper,
11-
ConnectionWrapper, SingleConnectionWrapper)
12-
from tortoise.exceptions import (ConfigurationError, DBConnectionError, IntegrityError,
13-
OperationalError, TransactionManagementError)
11+
ConnectionWrapper)
12+
from tortoise.exceptions import (DBConnectionError, IntegrityError, OperationalError,
13+
TransactionManagementError)
1414
from tortoise.transactions import current_transaction_map
1515

1616

@@ -49,7 +49,7 @@ def __init__(self, user: str, password: str, database: str, host: str, port: Sup
4949
port=self.port,
5050
database=self.database
5151
)
52-
self._db_pool = None # Type: Optional[asyncpg.pool.Pool]
52+
# self._db_pool = None # Type: Optional[asyncpg.pool.Pool]
5353
self._connection = None # Type: Optional[asyncpg.Connection]
5454

5555
self._transaction_class = type(
@@ -58,10 +58,7 @@ def __init__(self, user: str, password: str, database: str, host: str, port: Sup
5858

5959
async def create_connection(self) -> None:
6060
try:
61-
if not self.single_connection:
62-
self._db_pool = await asyncpg.create_pool(self.dsn)
63-
else:
64-
self._connection = await asyncpg.connect(self.dsn)
61+
self._connection = await asyncpg.connect(self.dsn)
6562
self.log.debug(
6663
'Created connection with params: user=%s database=%s host=%s port=%s',
6764
self.user, self.database, self.host, self.port
@@ -72,14 +69,11 @@ async def create_connection(self) -> None:
7269
))
7370

7471
async def close(self) -> None:
75-
if self._db_pool:
76-
await self._db_pool.close()
7772
if self._connection:
7873
await self._connection.close()
74+
self._connection = None
7975

8076
async def db_create(self) -> None:
81-
single_connection = self.single_connection
82-
self.single_connection = True
8377
self._connection = await asyncpg.connect(self.DSN_TEMPLATE.format(
8478
user=self.user,
8579
password=self.password,
@@ -91,11 +85,8 @@ async def db_create(self) -> None:
9185
'CREATE DATABASE "{}" OWNER "{}"'.format(self.database, self.user)
9286
)
9387
await self._connection.close() # type: ignore
94-
self.single_connection = single_connection
9588

9689
async def db_delete(self) -> None:
97-
single_connection = self.single_connection
98-
self.single_connection = True
9990
self._connection = await asyncpg.connect(self.DSN_TEMPLATE.format(
10091
user=self.user,
10192
password=self.password,
@@ -105,22 +96,15 @@ async def db_delete(self) -> None:
10596
))
10697
try:
10798
await self.execute_script('DROP DATABASE "{}"'.format(self.database))
108-
except asyncpg.InvalidCatalogNameError:
99+
except asyncpg.InvalidCatalogNameError: # pragma: nocoverage
109100
pass
110101
await self._connection.close() # type: ignore
111-
self.single_connection = single_connection
112102

113103
def acquire_connection(self) -> ConnectionWrapper:
114-
if not self.single_connection:
115-
return self._db_pool.acquire() # type: ignore
116-
else:
117-
return ConnectionWrapper(self._connection)
104+
return ConnectionWrapper(self._connection)
118105

119106
def _in_transaction(self) -> 'TransactionWrapper':
120-
if self.single_connection:
121-
return self._transaction_class(self.connection_name, connection=self._connection)
122-
else:
123-
return self._transaction_class(self.connection_name, pool=self._db_pool)
107+
return self._transaction_class(self.connection_name, self._connection)
124108

125109
@translate_exceptions
126110
async def execute_insert(self, query: str, values: list) -> int:
@@ -142,29 +126,11 @@ async def execute_script(self, query: str) -> None:
142126
async with self.acquire_connection() as connection:
143127
await connection.execute(query)
144128

145-
async def get_single_connection(self):
146-
if self.single_connection:
147-
return self._single_connection_class(self.connection_name, self._connection)
148-
else:
149-
connection = await self._db_pool._acquire(None)
150-
return self._single_connection_class(self.connection_name, connection)
151-
152-
async def release_single_connection(self, single_connection):
153-
if not self.single_connection:
154-
await self._db_pool.release(single_connection.connection)
155-
156129

157130
class TransactionWrapper(AsyncpgDBClient, BaseTransactionWrapper):
158-
def __init__(self, connection_name: str, pool=None, connection=None) -> None:
159-
if pool and connection:
160-
raise ConfigurationError('You must pass either connection or pool')
131+
def __init__(self, connection_name: str, connection) -> None:
161132
self._connection = connection
162133
self.log = logging.getLogger('db_client')
163-
self._pool = pool
164-
self.single_connection = True
165-
self._single_connection_class = type(
166-
'SingleConnectionWrapper', (SingleConnectionWrapper, self.__class__), {}
167-
)
168134
self._transaction_class = self.__class__
169135
self._old_context_value = None
170136
self.connection_name = connection_name
@@ -173,12 +139,7 @@ def __init__(self, connection_name: str, pool=None, connection=None) -> None:
173139
def acquire_connection(self) -> ConnectionWrapper:
174140
return ConnectionWrapper(self._connection)
175141

176-
async def _get_connection(self):
177-
return await self._pool._acquire(None)
178-
179142
async def start(self):
180-
if not self._connection:
181-
self._connection = await self._get_connection()
182143
self.transaction = self._connection.transaction()
183144
await self.transaction.start()
184145
current_transaction = current_transaction_map[self.connection_name]
@@ -190,17 +151,11 @@ async def commit(self):
190151
await self.transaction.commit()
191152
except asyncpg.exceptions._base.InterfaceError as exc:
192153
raise TransactionManagementError(exc)
193-
if self._pool:
194-
await self._pool.release(self._connection)
195-
self._connection = None
196154
current_transaction_map[self.connection_name].set(self._old_context_value)
197155

198156
async def rollback(self):
199157
try:
200158
await self.transaction.rollback()
201159
except asyncpg.exceptions._base.InterfaceError as exc:
202160
raise TransactionManagementError(exc)
203-
if self._pool:
204-
await self._pool.release(self._connection)
205-
self._connection = None
206161
current_transaction_map[self.connection_name].set(self._old_context_value)

tortoise/backends/asyncpg/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@
88
class AsyncpgExecutor(BaseExecutor):
99
def _prepare_insert_statement(self, columns: List[str]) -> str:
1010
return str(
11-
self.connection.query_class.into(Table(self.model._meta.table)).columns(*columns)
11+
self.db.query_class.into(Table(self.model._meta.table)).columns(*columns)
1212
.insert('???').returning('id')
1313
).replace("'???'", ','.join(['$%d' % (i + 1, ) for i in range(len(columns))]))

tortoise/backends/base/client.py

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@ class BaseDBAsyncClient:
1212
executor_class = BaseExecutor
1313
schema_generator = BaseSchemaGenerator
1414

15-
def __init__(self, connection_name: str, single_connection: bool = True, **kwargs) -> None:
15+
def __init__(self, connection_name: str, **kwargs) -> None:
1616
self.log = logging.getLogger('db_client')
17-
self.single_connection = single_connection
1817
self.connection_name = connection_name
19-
self._single_connection_class = type(
20-
'SingleConnectionWrapper', (SingleConnectionWrapper, self.__class__), {}
21-
)
2218

2319
async def create_connection(self) -> None:
2420
raise NotImplementedError() # pragma: nocoverage
@@ -47,12 +43,6 @@ async def execute_query(self, query: str) -> Sequence[dict]:
4743
async def execute_script(self, query: str) -> None:
4844
raise NotImplementedError() # pragma: nocoverage
4945

50-
async def get_single_connection(self) -> 'BaseDBAsyncClient':
51-
raise NotImplementedError() # pragma: nocoverage
52-
53-
async def release_single_connection(self, single_connection: 'BaseDBAsyncClient') -> None:
54-
raise NotImplementedError() # pragma: nocoverage
55-
5646

5747
class ConnectionWrapper:
5848
__slots__ = ('connection', )
@@ -67,35 +57,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
6757
pass
6858

6959

70-
class SingleConnectionWrapper(BaseDBAsyncClient):
71-
# pylint: disable=W0223,W0231
72-
73-
def __init__(self, connection_name: str, connection, closing_callback=None) -> None:
74-
self.log = logging.getLogger('db_client')
75-
self.connection_name = connection_name
76-
self.connection = connection
77-
self.single_connection = True
78-
self.closing_callback = closing_callback
79-
80-
def acquire_connection(self) -> ConnectionWrapper:
81-
return ConnectionWrapper(self.connection)
82-
83-
async def get_single_connection(self) -> 'SingleConnectionWrapper':
84-
# Real class object is generated in runtime, so we use __class__ reference
85-
# instead of using SingleConnectionWrapper directly
86-
return self.__class__(self.connection_name, self.connection, self)
87-
88-
async def release_single_connection(self, single_connection: 'BaseDBAsyncClient') -> None:
89-
return
90-
91-
async def __aenter__(self):
92-
return self
93-
94-
async def __aexit__(self, exc_type, exc_val, exc_tb):
95-
if self.closing_callback:
96-
await self.closing_callback(self)
97-
98-
9960
class BaseTransactionWrapper:
10061
async def start(self) -> None:
10162
raise NotImplementedError() # pragma: nocoverage

tortoise/backends/base/config_generator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ def expand_db_url(db_url: str, testing: bool = False) -> dict:
5757
params[key] = val[-1]
5858

5959
if testing:
60-
params['single_connection'] = True
6160
path = path.replace('\\{', '{').replace('\\}', '}')
6261
path = path.format(uuid.uuid4().hex)
6362

tortoise/backends/base/executor.py

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@ class BaseExecutor:
1515
def __init__(self, model, db=None, prefetch_map=None, prefetch_queries=None):
1616
self.model = model
1717
self.db = db
18-
self.connection = None
1918
self.prefetch_map = prefetch_map if prefetch_map else {}
2019
self._prefetch_queries = prefetch_queries if prefetch_queries else {}
2120

2221
async def execute_select(self, query, custom_fields=None):
23-
self.connection = await self.db.get_single_connection()
24-
raw_results = await self.connection.execute_query(str(query))
22+
raw_results = await self.db.execute_query(str(query))
2523
instance_list = []
2624
for row in raw_results:
2725
instance = self.model(**row)
@@ -30,8 +28,6 @@ async def execute_select(self, query, custom_fields=None):
3028
setattr(instance, field, row[field])
3129
instance_list.append(instance)
3230
await self._execute_prefetch_queries(instance_list)
33-
await self.db.release_single_connection(self.connection)
34-
self.connection = None
3531
return instance_list
3632

3733
def _prepare_insert_columns(self):
@@ -60,7 +56,6 @@ def _prepare_insert_statement(self, columns: List[str]) -> str:
6056
raise NotImplementedError() # pragma: nocoverage
6157

6258
async def execute_insert(self, instance):
63-
self.connection = await self.db.get_single_connection()
6459
key = '{}:{}'.format(self.db.connection_name, self.model._meta.table)
6560
if key not in INSERT_CACHE:
6661
regular_columns, columns = self._prepare_insert_columns()
@@ -73,15 +68,12 @@ async def execute_insert(self, instance):
7368
instance=instance,
7469
regular_columns=regular_columns,
7570
)
76-
instance.id = await self.connection.execute_insert(query, values)
77-
await self.db.release_single_connection(self.connection)
78-
self.connection = None
71+
instance.id = await self.db.execute_insert(query, values)
7972
return instance
8073

8174
async def execute_update(self, instance):
82-
self.connection = await self.db.get_single_connection()
8375
table = Table(self.model._meta.table)
84-
query = self.connection.query_class.update(table)
76+
query = self.db.query_class.update(table)
8577
for field, db_field in self.model._meta.fields_db_projection.items():
8678
field_object = self.model._meta.fields_map[field]
8779
if not field_object.generated:
@@ -90,9 +82,7 @@ async def execute_update(self, instance):
9082
self._field_to_db(field_object, getattr(instance, field), instance)
9183
)
9284
query = query.where(table.id == instance.id)
93-
await self.connection.execute_query(str(query))
94-
await self.db.release_single_connection(self.connection)
95-
self.connection = None
85+
await self.db.execute_query(str(query))
9686
return instance
9787

9888
async def execute_delete(self, instance):
@@ -133,7 +123,7 @@ async def _prefetch_m2m_relation(self, instance_list, field, related_query):
133123

134124
through_table = Table(field_object.through)
135125

136-
subquery = self.connection.query_class.from_(through_table).select(
126+
subquery = self.db.query_class.from_(through_table).select(
137127
getattr(through_table, field_object.backward_key).as_('_backward_relation_key'),
138128
getattr(through_table, field_object.forward_key).as_('_forward_relation_key')
139129
).where(getattr(through_table, field_object.backward_key).isin(instance_id_set))
@@ -145,12 +135,12 @@ async def _prefetch_m2m_relation(self, instance_list, field, related_query):
145135
subquery._backward_relation_key.as_('_backward_relation_key'),
146136
*[getattr(related_query_table, field).as_(field) for field in related_query.fields]
147137
)
148-
raw_results = await self.connection.execute_query(str(query))
138+
raw_results = await self.db.execute_query(str(query))
149139
relations = {(e['_backward_relation_key'], e['id']) for e in raw_results}
150140
related_object_list = [related_query.model(**e) for e in raw_results]
151141
await self.__class__(
152142
model=related_query.model,
153-
db=self.connection,
143+
db=self.db,
154144
prefetch_map=related_query._prefetch_map,
155145
).fetch_for_list(related_object_list)
156146
related_object_map = {e.id: e for e in related_object_list}
@@ -188,7 +178,7 @@ def _make_prefetch_queries(self):
188178
else:
189179
related_model_field = self.model._meta.fields_map.get(field)
190180
related_model = related_model_field.type
191-
related_query = related_model.all().using_db(self.connection)
181+
related_query = related_model.all().using_db(self.db)
192182
if forwarded_prefetches:
193183
related_query = related_query.prefetch_related(*forwarded_prefetches)
194184
self._prefetch_queries[field] = related_query
@@ -209,7 +199,6 @@ async def _execute_prefetch_queries(self, instance_list):
209199
return instance_list
210200

211201
async def fetch_for_list(self, instance_list, *args):
212-
self.connection = await self.db.get_single_connection()
213202
self.prefetch_map = {}
214203
for relation in args:
215204
relation_split = relation.split('__')
@@ -223,8 +212,6 @@ async def fetch_for_list(self, instance_list, *args):
223212
if forwarded_prefetch:
224213
self.prefetch_map[first_level_field].add(forwarded_prefetch)
225214
await self._execute_prefetch_queries(instance_list)
226-
await self.db.release_single_connection(self.connection)
227-
self.connection = None
228215
return instance_list
229216

230217
@classmethod

0 commit comments

Comments
 (0)