-
Notifications
You must be signed in to change notification settings - Fork 7
/
async_table.py
194 lines (153 loc) · 5.64 KB
/
async_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
About
=====
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
dialect, and exercise a few basic examples using the low-level table API, in
asynchronous mode.
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used.
The corresponding SQLAlchemy dialect identifiers are::
# PostgreSQL protocol on port 5432, using `asyncpg`
crate+asyncpg://crate@localhost:5432/doc
# PostgreSQL protocol on port 5432, using `psycopg`
crate+psycopg://crate@localhost:5432/doc
Synopsis
========
::
# Run CrateDB
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
# Use PostgreSQL protocol, with `asyncpg`
python async_table.py asyncpg
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
python async_table.py psycopg
# Use with both variants
python async_table.py asyncpg psycopg
"""
import asyncio
import sys
import typing as t
from functools import lru_cache
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine
class AsynchronousTableExample:
"""
Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode,
using the `asyncpg` and `psycopg` drivers.
"""
def __init__(self, dsn: str):
self.dsn = dsn
@property
@lru_cache
def engine(self):
"""
Provide an SQLAlchemy engine object.
"""
return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True)
@property
@lru_cache
def table(self):
"""
Provide an SQLAlchemy table object.
"""
metadata = sa.MetaData()
return sa.Table(
"testdrive",
metadata,
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("y", sa.Integer),
)
async def conn_run_sync(self, func: t.Callable, *args, **kwargs):
"""
To support SQLAlchemy DDL methods as well as legacy functions, the
AsyncConnection.run_sync() awaitable method will pass a "sync"
version of the AsyncConnection object to any synchronous method,
where synchronous IO calls will be transparently translated for
await.
https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
"""
# `conn` is an instance of `AsyncConnection`
async with self.engine.begin() as conn:
return await conn.run_sync(func, *args, **kwargs)
async def run(self):
"""
Run the whole recipe, returning the result from the "read" step.
"""
await self.create()
await self.insert(sync=True)
return await self.read()
async def create(self):
"""
Create table schema, completely dropping it upfront.
"""
await self.conn_run_sync(self.table.drop, checkfirst=True)
await self.conn_run_sync(self.table.create)
async def insert(self, sync: bool = False):
"""
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account.
"""
async with self.engine.begin() as conn:
stmt = self.table.insert().values(x=1, y=42)
await conn.execute(stmt)
stmt = self.table.insert().values(x=2, y=42)
await conn.execute(stmt)
if sync and self.dsn.startswith("crate"):
await conn.execute(sa.text("REFRESH TABLE testdrive;"))
async def read(self):
"""
Read data from the database.
"""
async with self.engine.begin() as conn:
cursor = await conn.execute(sa.text("SELECT * FROM testdrive;"))
return cursor.fetchall()
async def reflect(self):
"""
Reflect the table schema from the database.
"""
# Optionally enable tracing SQLAlchemy calls.
# self.trace()
def reflect(session):
"""
A function written in "synchronous" style that will be invoked
within the asyncio event loop.
The session object passed is a traditional orm.Session object with
synchronous interface.
https://docs.sqlalchemy.org/en/20/_modules/asyncio/greenlet_orm.html
"""
meta = sa.MetaData()
reflected_table = sa.Table("testdrive", meta, autoload_with=session)
print("Table information:")
print(f"Table: {reflected_table}")
print(f"Columns: {reflected_table.columns}")
print(f"Constraints: {reflected_table.constraints}")
print(f"Primary key: {reflected_table.primary_key}")
return await self.conn_run_sync(reflect)
@staticmethod
def trace():
"""
Trace execution flow through SQLAlchemy.
pip install hunter
"""
from hunter import Q, trace
constraint = Q(module_startswith="sqlalchemy")
trace(constraint)
async def run_example(dsn: str):
example = AsynchronousTableExample(dsn)
# Run a basic conversation.
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
result = await example.run()
print(result)
# Reflect the table schema.
await example.reflect()
def run_drivers(drivers: t.List[str]):
for driver in drivers:
if driver == "asyncpg":
dsn = "crate+asyncpg://crate@localhost:5432/doc"
elif driver == "psycopg":
dsn = "crate+psycopg://crate@localhost:5432/doc"
else:
raise ValueError(f"Unknown driver: {driver}")
asyncio.run(run_example(dsn))
if __name__ == "__main__":
drivers = sys.argv[1:]
if not drivers:
raise ValueError("Please select driver")
run_drivers(drivers)