Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition errors in new refresh_schemas() mechanism #1231

Closed
simonw opened this issue Feb 18, 2021 · 11 comments
Closed

Race condition errors in new refresh_schemas() mechanism #1231

simonw opened this issue Feb 18, 2021 · 11 comments
Labels

Comments

@simonw
Copy link
Owner

simonw commented Feb 18, 2021

I tried running a Locust load test against Datasette and hit an error message about a failure to create tables because they already existed. I think this means there are race conditions in the new refresh_schemas() mechanism added in #1150.

@simonw simonw added the bug label Feb 18, 2021
@simonw
Copy link
Owner Author

simonw commented Feb 18, 2021

I started trying to use locks to resolve this but I've not figured out the right way to do that yet - here's my first experiment:

diff --git a/datasette/app.py b/datasette/app.py
index 9e15a16..1681c9d 100644
--- a/datasette/app.py
+++ b/datasette/app.py
@@ -217,6 +217,7 @@ class Datasette:
         self.inspect_data = inspect_data
         self.immutables = set(immutables or [])
         self.databases = collections.OrderedDict()
+        self._refresh_schemas_lock = threading.Lock()
         if memory or not self.files:
             self.add_database(Database(self, is_memory=True), name="_memory")
         # memory_name is a random string so that each Datasette instance gets its own
@@ -324,6 +325,13 @@ class Datasette:
         self.client = DatasetteClient(self)
 
     async def refresh_schemas(self):
+        return
+        if self._refresh_schemas_lock.locked():
+            return
+        with self._refresh_schemas_lock:
+            await self._refresh_schemas()
+
+    async def _refresh_schemas(self):
         internal_db = self.databases["_internal"]
         if not self.internal_db_created:
             await init_internal_db(internal_db)

@simonw
Copy link
Owner Author

simonw commented Feb 18, 2021

Ideally I'd figure out a way to replicate this error in a concurrent unit test.

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

This just broke the datasette-graphql test suite: simonw/datasette-graphql#77 - I need to figure out a solution here.

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

Here's the traceback I got from datasette-graphql (annoyingly only running the tests in GitHub Actions CI - I've not been able to replicate on my laptop yet):

tests/test_utils.py .                                                    [100%]

=================================== FAILURES ===================================
_________________________ test_graphql_examples[path0] _________________________

ds = <datasette.app.Datasette object at 0x7f6b8b6f8fd0>
path = PosixPath('/home/runner/work/datasette-graphql/datasette-graphql/examples/filters.md')

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "path", (pathlib.Path(__file__).parent.parent / "examples").glob("*.md")
    )
    async def test_graphql_examples(ds, path):
        content = path.read_text()
        query = graphql_re.search(content)[1]
        try:
            variables = variables_re.search(content)[1]
        except TypeError:
            variables = "{}"
        expected = json.loads(json_re.search(content)[1])
        response = await ds.client.post(
            "/graphql",
            json={
                "query": query,
                "variables": json.loads(variables),
            },
        )
>       assert response.status_code == 200, response.json()
E       AssertionError: {'data': {'repos_arraycontains': None, 'users_contains': None, 'users_date': None, 'users_endswith': None, ...}, 'erro...", 'path': ['users_gt']}, {'locations': [{'column': 5, 'line': 34}], 'message': "'rows'", 'path': ['users_gte']}, ...]}
E       assert 500 == 200
E        +  where 500 = <Response [500 Internal Server Error]>.status_code

tests/test_graphql.py:142: AssertionError
----------------------------- Captured stderr call -----------------------------
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
table databases already exists
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/app.py", line 1171, in route_path
    response = await view(request, send)
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/views/base.py", line 151, in view
    request, **request.scope["url_route"]["kwargs"]
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/views/base.py", line 123, in dispatch_request
    await self.ds.refresh_schemas()
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/app.py", line 338, in refresh_schemas
    await init_internal_db(internal_db)
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/utils/internal_db.py", line 16, in init_internal_db
    block=True,
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/database.py", line 102, in execute_write
    return await self.execute_write_fn(_inner, block=block)
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/database.py", line 118, in execute_write_fn
    raise result
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/database.py", line 139, in _execute_writes
    result = task.fn(conn)
  File "/opt/hostedtoolcache/Python/3.7.11/x64/lib/python3.7/site-packages/datasette/database.py", line 100, in _inner
    return conn.execute(sql, params or [])
sqlite3.OperationalError: table databases already exists

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

The race condition happens inside this method - initially with the call to await init_internal_db():

datasette/datasette/app.py

Lines 334 to 359 in dd5ee8e

async def refresh_schemas(self):
internal_db = self.databases["_internal"]
if not self.internal_db_created:
await init_internal_db(internal_db)
self.internal_db_created = True
current_schema_versions = {
row["database_name"]: row["schema_version"]
for row in await internal_db.execute(
"select database_name, schema_version from databases"
)
}
for database_name, db in self.databases.items():
schema_version = (await db.execute("PRAGMA schema_version")).first()[0]
# Compare schema versions to see if we should skip it
if schema_version == current_schema_versions.get(database_name):
continue
await internal_db.execute_write(
"""
INSERT OR REPLACE INTO databases (database_name, path, is_memory, schema_version)
VALUES (?, ?, ?, ?)
""",
[database_name, str(db.path), db.is_memory, schema_version],
block=True,
)
await populate_schema_tables(internal_db, db)

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

The only place that calls refresh_schemas() is here:

async def dispatch_request(self, request, *args, **kwargs):
if self.ds:
await self.ds.refresh_schemas()
handler = getattr(self, request.method.lower(), None)
return await handler(request, *args, **kwargs)

Ideally only one call to refresh_schemas() would be running at any one time.

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

https://stackoverflow.com/a/25799871/6083 has a good example of using asyncio.Lock():

stuff_lock = asyncio.Lock()

async def get_stuff(url):
    async with stuff_lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET', url)
        cache[url] = stuff
        return stuff

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

Second attempt at this:

diff --git a/datasette/app.py b/datasette/app.py
index 5976d8b..5f348cb 100644
--- a/datasette/app.py
+++ b/datasette/app.py
@@ -224,6 +224,7 @@ class Datasette:
         self.inspect_data = inspect_data
         self.immutables = set(immutables or [])
         self.databases = collections.OrderedDict()
+        self._refresh_schemas_lock = asyncio.Lock()
         self.crossdb = crossdb
         if memory or crossdb or not self.files:
             self.add_database(Database(self, is_memory=True), name="_memory")
@@ -332,6 +333,12 @@ class Datasette:
         self.client = DatasetteClient(self)
 
     async def refresh_schemas(self):
+        if self._refresh_schemas_lock.locked():
+            return
+        async with self._refresh_schemas_lock:
+            await self._refresh_schemas()
+
+    async def _refresh_schemas(self):
         internal_db = self.databases["_internal"]
         if not self.internal_db_created:
             await init_internal_db(internal_db)

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

The test suite passes with that change.

@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

I can't replicate the race condition locally with or without this patch. I'm going to push the commit and then test the CI run from datasette-graphql that was failing against it.

simonw added a commit to simonw/datasette-graphql that referenced this issue Jul 16, 2021
simonw added a commit to simonw/datasette-graphql that referenced this issue Jul 16, 2021
@simonw
Copy link
Owner Author

simonw commented Jul 16, 2021

That fixed the race condition in the datasette-graphql tests, which is the only place that I've been able to successfully replicate this. I'm going to land this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant