Skip to content

Commit d80cb27

Browse files
author
Ilya Gurov
authored
feat: support stale reads (#146)
1 parent 5253233 commit d80cb27

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,45 @@ Note that execution options are applied lazily - on the `execute()` method call,
190190

191191
ReadOnly/ReadWrite mode of a connection can't be changed while a transaction is in progress - first you must commit or rollback it.
192192

193+
### Stale reads
194+
To use the Spanner [Stale Reads](https://cloud.google.com/spanner/docs/reads#perform-stale-read) with SQLAlchemy you can tweak the connection execution options with a wanted staleness value. For example:
195+
```python
196+
# maximum staleness
197+
with engine.connect().execution_options(
198+
read_only=True,
199+
staleness={"max_staleness": datetime.timedelta(seconds=5)}
200+
) as connection:
201+
connection.execute(select(["*"], from_obj=table)).fetchall()
202+
```
203+
204+
```python
205+
# exact staleness
206+
with engine.connect().execution_options(
207+
read_only=True,
208+
staleness={"exact_staleness": datetime.timedelta(seconds=5)}
209+
) as connection:
210+
connection.execute(select(["*"], from_obj=table)).fetchall()
211+
```
212+
213+
```python
214+
# min read timestamp
215+
with engine.connect().execution_options(
216+
read_only=True,
217+
staleness={"min_read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
218+
) as connection:
219+
connection.execute(select(["*"], from_obj=table)).fetchall()
220+
```
221+
222+
```python
223+
# read timestamp
224+
with engine.connect().execution_options(
225+
read_only=True,
226+
staleness={"read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
227+
) as connection:
228+
connection.execute(select(["*"], from_obj=table)).fetchall()
229+
```
230+
Note that the set option will be dropped when the connection is returned back to the pool.
231+
193232
### DDL and transactions
194233
DDL statements are executed outside the regular transactions mechanism, which means DDL statements will not be rolled back on normal transaction rollback.
195234

google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
from sqlalchemy import ForeignKeyConstraint, types, util
2626
from sqlalchemy.engine.base import Engine
2727
from sqlalchemy.engine.default import DefaultDialect, DefaultExecutionContext
28+
from sqlalchemy.event import listens_for
2829
from sqlalchemy.ext.compiler import compiles
30+
from sqlalchemy.pool import Pool
2931
from sqlalchemy.sql.compiler import (
3032
selectable,
3133
DDLCompiler,
@@ -38,6 +40,13 @@
3840
from google.cloud import spanner_dbapi
3941
from google.cloud.sqlalchemy_spanner._opentelemetry_tracing import trace_call
4042

43+
44+
@listens_for(Pool, "reset")
45+
def reset_connection(dbapi_conn, connection_record):
46+
"""An event of returning a connection back to a pool."""
47+
dbapi_conn.connection.staleness = None
48+
49+
4150
# Spanner-to-SQLAlchemy types map
4251
_type_map = {
4352
"BOOL": types.Boolean,
@@ -128,6 +137,10 @@ def pre_exec(self):
128137
if read_only is not None:
129138
self._dbapi_connection.connection.read_only = read_only
130139

140+
staleness = self.execution_options.get("staleness", None)
141+
if staleness is not None:
142+
self._dbapi_connection.connection.staleness = staleness
143+
131144

132145
class SpannerIdentifierPreparer(IdentifierPreparer):
133146
"""Identifiers compiler.

test/test_suite.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
from datetime import timezone
17+
import datetime
1818
import decimal
1919
import operator
2020
import os
@@ -975,7 +975,9 @@ def test_row_w_scalar_select(self):
975975

976976
eq_(
977977
row["somelabel"],
978-
DatetimeWithNanoseconds(2006, 5, 12, 12, 0, 0, tzinfo=timezone.utc),
978+
DatetimeWithNanoseconds(
979+
2006, 5, 12, 12, 0, 0, tzinfo=datetime.timezone.utc
980+
),
979981
)
980982

981983

@@ -1578,7 +1580,7 @@ class ExecutionOptionsTest(fixtures.TestBase):
15781580
"""
15791581

15801582
def setUp(self):
1581-
self._engine = create_engine(get_db_url())
1583+
self._engine = create_engine(get_db_url(), pool_size=1)
15821584
self._metadata = MetaData(bind=self._engine)
15831585

15841586
self._table = Table(
@@ -1594,3 +1596,15 @@ def test_read_only(self):
15941596
with self._engine.connect().execution_options(read_only=True) as connection:
15951597
connection.execute(select(["*"], from_obj=self._table)).fetchall()
15961598
assert connection.connection.read_only is True
1599+
1600+
def test_staleness(self):
1601+
with self._engine.connect().execution_options(
1602+
read_only=True, staleness={"max_staleness": datetime.timedelta(seconds=5)}
1603+
) as connection:
1604+
connection.execute(select(["*"], from_obj=self._table)).fetchall()
1605+
assert connection.connection.staleness == {
1606+
"max_staleness": datetime.timedelta(seconds=5)
1607+
}
1608+
1609+
with self._engine.connect() as connection:
1610+
assert connection.connection.staleness is None

0 commit comments

Comments
 (0)