Skip to content

Commit

Permalink
Retry test suite queries after TransactionConflictError (geldata#8249)
Browse files Browse the repository at this point in the history
Only outside of transactions. This should reduce flakes
on global DDL conflicts, among other things.
  • Loading branch information
msullivan authored Jan 24, 2025
1 parent 414380a commit ea5afab
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
61 changes: 46 additions & 15 deletions edb/testbase/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ async def commit(self) -> None:
async def _commit(self) -> None:
query = self._make_commit_query()
try:
await self._connection.execute(query)
# Use _fetchall to ensure there is no retry performed.
# The protocol level apparently thinks the transaction is
# over if COMMIT fails, and since we use that to decide
# whether to retry in query/execute, it would want to
# retry a COMMIT.
await self._connection._fetchall(query)
except BaseException:
self._state = TransactionState.FAILED
raise
Expand Down Expand Up @@ -391,29 +396,55 @@ def _shallow_clone(self):
def _get_query_cache(self) -> abstract.QueryCache:
return self._query_cache

async def ensure_connected(self):
if self.is_closed():
await self.connect()
return self

async def _query(self, query_context: abstract.QueryContext):
await self.ensure_connected()
return await self.raw_query(query_context)

async def _retry_operation(self, func):
i = 0
while True:
i += 1
try:
return await func()
# Retry transaction conflict errors, up to a maximum of 5
# times. We don't do this if we are in a transaction,
# since that *ought* to be done at the transaction level.
# Though in reality in the test suite it is usually done at the
# test runner level.
except errors.TransactionConflictError:
if i >= 5 or self.is_in_transaction():
raise
await asyncio.sleep(
min((2 ** i) * 0.1, 10)
+ random.randrange(100) * 0.001
)

async def _execute(self, script: abstract.ExecuteContext) -> None:
await self.ensure_connected()
ctx = script.lower(allow_capabilities=edgedb_enums.Capability.ALL)
res = await self._protocol.execute(ctx)
if ctx.warnings:
script.warning_handler(ctx.warnings, res)

async def ensure_connected(self):
if self.is_closed():
await self.connect()
return self
async def _inner():
ctx = script.lower(allow_capabilities=edgedb_enums.Capability.ALL)
res = await self._protocol.execute(ctx)
if ctx.warnings:
script.warning_handler(ctx.warnings, res)

await self._retry_operation(_inner)

async def raw_query(self, query_context: abstract.QueryContext):
ctx = query_context.lower(
allow_capabilities=edgedb_enums.Capability.ALL)
res = await self._protocol.query(ctx)
if ctx.warnings:
res = query_context.warning_handler(ctx.warnings, res)
return res
async def _inner():
ctx = query_context.lower(
allow_capabilities=edgedb_enums.Capability.ALL)
res = await self._protocol.query(ctx)
if ctx.warnings:
res = query_context.warning_handler(ctx.warnings, res)
return res

return await self._retry_operation(_inner)

async def _fetchall_generic(self, ctx):
await self.ensure_connected()
Expand Down
6 changes: 4 additions & 2 deletions tests/test_server_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3395,7 +3395,8 @@ async def test_server_proto_concurrent_ddl(self):
# deferred_shield ensures that none of the
# operations get cancelled, which allows us to
# aclose them all cleanly.
g.create_task(asyncutil.deferred_shield(con.execute(f'''
# Use _fetchall, because it doesn't retry
g.create_task(asyncutil.deferred_shield(con._fetchall(f'''
CREATE TYPE {typename_prefix}{i} {{
CREATE REQUIRED PROPERTY prop1 -> std::int64;
}};
Expand Down Expand Up @@ -3441,7 +3442,8 @@ async def test_server_proto_concurrent_global_ddl(self):
# deferred_shield ensures that none of the
# operations get cancelled, which allows us to
# aclose them all cleanly.
g.create_task(asyncutil.deferred_shield(con.execute(f'''
# Use _fetchall, because it doesn't retry
g.create_task(asyncutil.deferred_shield(con._fetchall(f'''
CREATE SUPERUSER ROLE concurrent_{i}
''')))
except ExceptionGroup as e:
Expand Down

0 comments on commit ea5afab

Please sign in to comment.