Skip to content

Commit 6e5ac1f

Browse files
committed
docs: add sample for stale reads
Adds a sample and tests for executing stale reads on Spanner. Using stale reads can improve performance when the application does not require the guarantees that are given by strong reads. Fixes #495
1 parent 3944cea commit 6e5ac1f

File tree

4 files changed

+305
-0
lines changed

4 files changed

+305
-0
lines changed

samples/noxfile.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ def transaction(session):
5757
_sample(session)
5858

5959

60+
@nox.session()
61+
def stale_read(session):
62+
_sample(session)
63+
64+
6065
@nox.session()
6166
def _all_samples(session):
6267
_sample(session)

samples/stale_read_sample.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2024 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import uuid
16+
from sqlalchemy import create_engine, Engine, select, text
17+
from sqlalchemy.orm import Session
18+
from sample_helper import run_sample
19+
from model import Singer
20+
21+
22+
# Shows how to execute stale reads on Spanner using SQLAlchemy.
23+
def stale_read_sample():
24+
engine = create_engine(
25+
"spanner:///projects/sample-project/"
26+
"instances/sample-instance/"
27+
"databases/sample-database",
28+
echo=True,
29+
)
30+
# First get the current database timestamp. We can use this timestamp to
31+
# query the database at a point in time where we know it was empty.
32+
with Session(engine.execution_options(isolation_level="AUTOCOMMIT")) as session:
33+
timestamp = session.execute(select(text("current_timestamp"))).one()[0]
34+
print(timestamp)
35+
36+
# Insert a few test rows.
37+
insert_test_data(engine)
38+
39+
# Create a session that uses a read-only transaction with a strong timestamp
40+
# bound. This means that it will read all data that has been committed at the
41+
# time this transaction starts.
42+
# Read-only transactions do not take locks, and are therefore preferred
43+
# above read/write transactions for workloads that only read data on Spanner.
44+
with Session(engine.execution_options(read_only=True)) as session:
45+
print("Found singers with strong timestamp bound:")
46+
singers = session.query(Singer).order_by(Singer.last_name).all()
47+
for singer in singers:
48+
print("Singer: ", singer.full_name)
49+
50+
# Create a session that uses a read-only transaction that selects data in
51+
# the past. We'll use the timestamp that we retrieved before inserting the
52+
# test data for this transaction.
53+
with Session(
54+
engine.execution_options(
55+
read_only=True, staleness={"read_timestamp": timestamp}
56+
)
57+
) as session:
58+
print("Searching for singers using a read timestamp in the past:")
59+
singers = session.query(Singer).order_by(Singer.last_name).all()
60+
if singers:
61+
for singer in singers:
62+
print("Singer: ", singer.full_name)
63+
else:
64+
print("No singers found.")
65+
66+
# Spanner also supports min_read_timestamp and max_staleness as staleness
67+
# options. These can only be used in auto-commit mode.
68+
# Spanner will choose a read timestamp that satisfies the given restriction
69+
# and that can be served as efficiently as possible.
70+
with Session(
71+
engine.execution_options(
72+
isolation_level="AUTOCOMMIT", staleness={"max_staleness": {"seconds": 15}}
73+
)
74+
) as session:
75+
print("Searching for singers using a max staleness of 15 seconds:")
76+
singers = session.query(Singer).order_by(Singer.last_name).all()
77+
if singers:
78+
for singer in singers:
79+
print("Singer: ", singer.full_name)
80+
else:
81+
print("No singers found.")
82+
83+
84+
def insert_test_data(engine: Engine):
85+
with Session(engine) as session:
86+
session.add_all(
87+
[
88+
Singer(id=str(uuid.uuid4()), first_name="John", last_name="Doe"),
89+
Singer(id=str(uuid.uuid4()), first_name="Jane", last_name="Doe"),
90+
]
91+
)
92+
session.commit()
93+
94+
95+
if __name__ == "__main__":
96+
run_sample(stale_read_sample)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2024 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from sqlalchemy import String, BigInteger
16+
from sqlalchemy.orm import DeclarativeBase
17+
from sqlalchemy.orm import Mapped
18+
from sqlalchemy.orm import mapped_column
19+
20+
21+
class Base(DeclarativeBase):
22+
pass
23+
24+
25+
class Singer(Base):
26+
__tablename__ = "singers"
27+
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
28+
name: Mapped[str] = mapped_column(String)
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Copyright 2024 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
from sqlalchemy import create_engine, select
17+
from sqlalchemy.orm import Session
18+
from sqlalchemy.testing import eq_, is_instance_of
19+
from google.cloud.spanner_v1 import (
20+
FixedSizePool,
21+
BatchCreateSessionsRequest,
22+
ExecuteSqlRequest,
23+
GetSessionRequest,
24+
BeginTransactionRequest,
25+
TransactionOptions,
26+
)
27+
from test.mockserver_tests.mock_server_test_base import MockServerTestBase
28+
from test.mockserver_tests.mock_server_test_base import add_result
29+
import google.cloud.spanner_v1.types.type as spanner_type
30+
import google.cloud.spanner_v1.types.result_set as result_set
31+
32+
33+
class TestStaleReads(MockServerTestBase):
34+
def test_stale_read_multi_use(self):
35+
from test.mockserver_tests.stale_read_model import Singer
36+
37+
add_singer_query_result("SELECT singers.id, singers.name \n" + "FROM singers")
38+
engine = create_engine(
39+
"spanner:///projects/p/instances/i/databases/d",
40+
echo=True,
41+
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
42+
)
43+
44+
timestamp = datetime.datetime.fromtimestamp(1733328910)
45+
for i in range(2):
46+
with Session(
47+
engine.execution_options(
48+
read_only=True,
49+
staleness={"read_timestamp": timestamp},
50+
)
51+
) as session:
52+
# Execute two queries in a read-only transaction.
53+
session.scalars(select(Singer)).all()
54+
session.scalars(select(Singer)).all()
55+
56+
# Verify the requests that we got.
57+
requests = self.spanner_service.requests
58+
eq_(9, len(requests))
59+
is_instance_of(requests[0], BatchCreateSessionsRequest)
60+
# We should get rid of this extra round-trip for GetSession....
61+
is_instance_of(requests[1], GetSessionRequest)
62+
is_instance_of(requests[2], BeginTransactionRequest)
63+
is_instance_of(requests[3], ExecuteSqlRequest)
64+
is_instance_of(requests[4], ExecuteSqlRequest)
65+
is_instance_of(requests[5], GetSessionRequest)
66+
is_instance_of(requests[6], BeginTransactionRequest)
67+
is_instance_of(requests[7], ExecuteSqlRequest)
68+
is_instance_of(requests[8], ExecuteSqlRequest)
69+
# Verify that the transaction is a read-only transaction.
70+
for index in [2, 6]:
71+
begin_request: BeginTransactionRequest = requests[index]
72+
eq_(
73+
TransactionOptions(
74+
dict(
75+
read_only=TransactionOptions.ReadOnly(
76+
dict(
77+
read_timestamp={"seconds": 1733328910},
78+
return_read_timestamp=True,
79+
)
80+
)
81+
)
82+
),
83+
begin_request.options,
84+
)
85+
86+
def test_stale_read_single_use(self):
87+
from test.mockserver_tests.stale_read_model import Singer
88+
89+
add_singer_query_result("SELECT singers.id, singers.name\n" + "FROM singers")
90+
engine = create_engine(
91+
"spanner:///projects/p/instances/i/databases/d",
92+
echo=True,
93+
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
94+
)
95+
96+
with Session(
97+
engine.execution_options(
98+
isolation_level="AUTOCOMMIT",
99+
staleness={"max_staleness": {"seconds": 15}},
100+
)
101+
) as session:
102+
# Execute two queries in autocommit.
103+
session.scalars(select(Singer)).all()
104+
session.scalars(select(Singer)).all()
105+
106+
# Verify the requests that we got.
107+
requests = self.spanner_service.requests
108+
eq_(5, len(requests))
109+
is_instance_of(requests[0], BatchCreateSessionsRequest)
110+
# We should get rid of this extra round-trip for GetSession....
111+
is_instance_of(requests[1], GetSessionRequest)
112+
is_instance_of(requests[2], ExecuteSqlRequest)
113+
is_instance_of(requests[3], GetSessionRequest)
114+
is_instance_of(requests[4], ExecuteSqlRequest)
115+
# Verify that the requests use a stale read.
116+
for index in [2, 4]:
117+
execute_request: ExecuteSqlRequest = requests[index]
118+
eq_(
119+
TransactionOptions(
120+
dict(
121+
read_only=TransactionOptions.ReadOnly(
122+
dict(
123+
max_staleness={"seconds": 15},
124+
return_read_timestamp=True,
125+
)
126+
)
127+
)
128+
),
129+
execute_request.transaction.single_use,
130+
)
131+
132+
133+
def add_singer_query_result(sql: str):
134+
result = result_set.ResultSet(
135+
dict(
136+
metadata=result_set.ResultSetMetadata(
137+
dict(
138+
row_type=spanner_type.StructType(
139+
dict(
140+
fields=[
141+
spanner_type.StructType.Field(
142+
dict(
143+
name="singers_id",
144+
type=spanner_type.Type(
145+
dict(code=spanner_type.TypeCode.INT64)
146+
),
147+
)
148+
),
149+
spanner_type.StructType.Field(
150+
dict(
151+
name="singers_name",
152+
type=spanner_type.Type(
153+
dict(code=spanner_type.TypeCode.STRING)
154+
),
155+
)
156+
),
157+
]
158+
)
159+
)
160+
)
161+
),
162+
)
163+
)
164+
result.rows.extend(
165+
[
166+
(
167+
"1",
168+
"Jane Doe",
169+
),
170+
(
171+
"2",
172+
"John Doe",
173+
),
174+
]
175+
)
176+
add_result(sql, result)

0 commit comments

Comments
 (0)