-
-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
.execute_write() and .execute_write_fn() methods on Database #683
Conversation
Next steps are from comment #682 (comment)
|
I've been testing this out by running one-off demo plugins. I saved the following in a file called from datasette import hookimpl
import sqlite_utils
import time
class AsgiLogToSqliteViaWriteQueue:
lookup_columns = (
"path",
"user_agent",
"referer",
"accept_language",
"content_type",
"query_string",
)
def __init__(self, app, db):
self.app = app
self.db = db
self._tables_ensured = False
async def ensure_tables(self):
def _ensure_tables(conn):
db = sqlite_utils.Database(conn)
for column in self.lookup_columns:
table = "{}s".format(column)
if not db[table].exists():
db[table].create({"id": int, "name": str}, pk="id")
if "requests" not in db.table_names():
db["requests"].create(
{
"start": float,
"method": str,
"path": int,
"query_string": int,
"user_agent": int,
"referer": int,
"accept_language": int,
"http_status": int,
"content_type": int,
"client_ip": str,
"duration": float,
"body_size": int,
},
foreign_keys=self.lookup_columns,
)
await self.db.execute_write_fn(_ensure_tables)
async def __call__(self, scope, receive, send):
if not self._tables_ensured:
self._tables_ensured = True
await self.ensure_tables()
response_headers = []
body_size = 0
http_status = None
async def wrapped_send(message):
nonlocal body_size, response_headers, http_status
if message["type"] == "http.response.start":
response_headers = message["headers"]
http_status = message["status"]
if message["type"] == "http.response.body":
body_size += len(message["body"])
await send(message)
start = time.time()
await self.app(scope, receive, wrapped_send)
end = time.time()
path = str(scope["path"])
query_string = None
if scope.get("query_string"):
query_string = "?{}".format(scope["query_string"].decode("utf8"))
request_headers = dict(scope.get("headers") or [])
referer = header(request_headers, "referer")
user_agent = header(request_headers, "user-agent")
accept_language = header(request_headers, "accept-language")
content_type = header(dict(response_headers), "content-type")
def _log_to_database(conn):
db = sqlite_utils.Database(conn)
db["requests"].insert(
{
"start": start,
"method": scope["method"],
"path": lookup(db, "paths", path),
"query_string": lookup(db, "query_strings", query_string),
"user_agent": lookup(db, "user_agents", user_agent),
"referer": lookup(db, "referers", referer),
"accept_language": lookup(db, "accept_languages", accept_language),
"http_status": http_status,
"content_type": lookup(db, "content_types", content_type),
"client_ip": scope.get("client", (None, None))[0],
"duration": end - start,
"body_size": body_size,
},
alter=True,
foreign_keys=self.lookup_columns,
)
await self.db.execute_write_fn(_log_to_database)
def header(d, name):
return d.get(name.encode("utf8"), b"").decode("utf8") or None
def lookup(db, table, value):
return db[table].lookup({"name": value}) if value else None
@hookimpl
def asgi_wrapper(datasette):
def wrap_with_class(app):
return AsgiLogToSqliteViaWriteQueue(
app, next(iter(datasette.databases.values()))
)
return wrap_with_class |
I'm going to muck around with a couple more demo plugins - in particular one derived from datasette-upload-csvs - to make sure I'm comfortable with this API - then add a couple of tests and merge it with documentation that warns "this is still an experimental feature and may change". |
I'm not convinced by the return value of the datasette/datasette/database.py Lines 79 to 83 in ab23482
Do I really need that |
I think But is it weird having a function that returns different types depending on if you passed I'm OK with the |
Also: are UUIDs really necessary here or could I use a simpler form of task identifier? Like an in-memory counter variable that starts at 0 and increments every time this instance of Datasette issues a new task ID? The neat thing about UUIDs is that I don't have to worry if there are multiple Datasette instances accepting writes behind a load balancer. That seems pretty unlikely (especially considering SQLite databases encourage only one process to be writing at a time)... but I am experimenting with PostgreSQL support in #670 so it's probably worth ensuring these task IDs really are globally unique. I'm going to stick with UUIDs. They're short-lived enough that their size doesn't really matter. |
Another demo plugin: from datasette import hookimpl
from datasette.utils import escape_sqlite
from starlette.responses import HTMLResponse
from starlette.endpoints import HTTPEndpoint
class DeleteTableApp(HTTPEndpoint):
def __init__(self, scope, receive, send, datasette):
self.datasette = datasette
super().__init__(scope, receive, send)
async def post(self, request):
formdata = await request.form()
database = formdata["database"]
db = self.datasette.databases[database]
await db.execute_write("drop table {}".format(escape_sqlite(formdata["table"])))
return HTMLResponse("Table has been deleted.")
@hookimpl
def asgi_wrapper(datasette):
def wrap_with_asgi_auth(app):
async def wrapped_app(scope, recieve, send):
if scope["path"] == "/-/delete-table":
await DeleteTableApp(scope, recieve, send, datasette)
else:
await app(scope, recieve, send)
return wrapped_app
return wrap_with_asgi_auth Then I saved this as {% extends "default:table.html" %}
{% block content %}
<form action="/-/delete-table" method="POST">
<p>
<input type="hidden" name="database" value="{{ database }}">
<input type="hidden" name="table" value="{{ table }}">
<input type="submit" value="Delete this table">
</p>
</form>
{{ super() }}
{% endblock %} (Needs CSRF protection added) I ran Datasette like this:
Result: I can delete tables! |
Here's the from datasette import hookimpl
from starlette.responses import PlainTextResponse, HTMLResponse
from starlette.endpoints import HTTPEndpoint
import csv as csv_std
import codecs
import sqlite_utils
class UploadApp(HTTPEndpoint):
def __init__(self, scope, receive, send, datasette):
self.datasette = datasette
super().__init__(scope, receive, send)
def get_database(self):
# For the moment just use the first one that's not immutable
mutable = [db for db in self.datasette.databases.values() if db.is_mutable]
return mutable[0]
async def get(self, request):
return HTMLResponse(
await self.datasette.render_template(
"upload_csv.html", {"database_name": self.get_database().name}
)
)
async def post(self, request):
formdata = await request.form()
csv = formdata["csv"]
# csv.file is a SpooledTemporaryFile, I can read it directly
filename = csv.filename
# TODO: Support other encodings:
reader = csv_std.reader(codecs.iterdecode(csv.file, "utf-8"))
headers = next(reader)
docs = (dict(zip(headers, row)) for row in reader)
if filename.endswith(".csv"):
filename = filename[:-4]
# Import data into a table of that name using sqlite-utils
db = self.get_database()
def fn(conn):
writable_conn = sqlite_utils.Database(db.path)
writable_conn[filename].insert_all(docs, alter=True)
return writable_conn[filename].count
# Without block=True we may attempt 'select count(*) from ...'
# before the table has been created by the write thread
count = await db.execute_write_fn(fn, block=True)
return HTMLResponse(
await self.datasette.render_template(
"upload_csv_done.html",
{
"database": self.get_database().name,
"table": filename,
"num_docs": count,
},
)
)
@hookimpl
def asgi_wrapper(datasette):
def wrap_with_asgi_auth(app):
async def wrapped_app(scope, recieve, send):
if scope["path"] == "/-/upload-csv":
await UploadApp(scope, recieve, send, datasette)
else:
await app(scope, recieve, send)
return wrapped_app
return wrap_with_asgi_auth I also dropped copies of the two template files from https://github.com/simonw/datasette-upload-csvs/tree/699e6ca591f36264bfc8e590d877e6852f274beb/datasette_upload_csvs/templates into my |
I'm going to punt on the ability to introspect the write queue and poll for completion using a UUID for the moment. Can add those later. |
Basic stuff to cover in unit tests:
I'm going to write these tests directly against a |
The other problem with the poll-for-UUID-completion idea: how long does this mean Datasette needs to keep holding onto the Maybe we say you only get to ask "is this UUID still in the queue" and if the answer is "no" then you assume the task has been completed. |
This failing test is a nasty one - the whole thing just hangs (so I imagine Travis will run for a while before hopefully giving up). Here's what happens if I add
|
I'm happy with this now. I'm going to merge to master. |
See #682
.execute_write()
and.execute_write_fn()