Skip to content

Commit 686bda6

Browse files
authored
chore(x-goog-request-id): commit testing scaffold (#1366)
* chore(x-goog-request-id): commit testing scaffold This change commits the scaffolding for which testing will be used. This is a carve out of PRs #1264 and #1364, meant to make those changes lighter and much easier to review then merge. Updates #1261 * Use guard to keep x-goog-request-id interceptor docile in tests until activation later * AtomicCounter update * Remove duplicate unavailable_status that had been already committed into main
1 parent e53eaa2 commit 686bda6

File tree

8 files changed

+163
-7
lines changed

8 files changed

+163
-7
lines changed

google/cloud/spanner_v1/_helpers.py

+4
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,10 @@ def __radd__(self, n):
707707
"""
708708
return self.__add__(n)
709709

710+
def reset(self):
711+
with self.__lock:
712+
self.__value = 0
713+
710714

711715
def _metadata_with_request_id(*args, **kwargs):
712716
return with_request_id(*args, **kwargs)

google/cloud/spanner_v1/client.py

+9
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
except ImportError: # pragma: NO COVER
7171
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
7272

73+
from google.cloud.spanner_v1._helpers import AtomicCounter
7374

7475
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
7576
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
@@ -182,6 +183,8 @@ class Client(ClientWithProject):
182183
SCOPE = (SPANNER_ADMIN_SCOPE,)
183184
"""The scopes required for Google Cloud Spanner."""
184185

186+
NTH_CLIENT = AtomicCounter()
187+
185188
def __init__(
186189
self,
187190
project=None,
@@ -263,6 +266,12 @@ def __init__(
263266
"default_transaction_options must be an instance of DefaultTransactionOptions"
264267
)
265268
self._default_transaction_options = default_transaction_options
269+
self._nth_client_id = Client.NTH_CLIENT.increment()
270+
self._nth_request = AtomicCounter(0)
271+
272+
@property
273+
def _next_nth_request(self):
274+
return self._nth_request.increment()
266275

267276
@property
268277
def credentials(self):

google/cloud/spanner_v1/request_id_header.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ def generate_rand_uint64():
3737

3838
def with_request_id(client_id, channel_id, nth_request, attempt, other_metadata=[]):
3939
req_id = f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}"
40-
all_metadata = other_metadata.copy()
40+
all_metadata = (other_metadata or []).copy()
4141
all_metadata.append((REQ_ID_HEADER_KEY, req_id))
4242
return all_metadata

google/cloud/spanner_v1/testing/database_test.py

+9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from google.cloud.spanner_v1.testing.interceptors import (
2626
MethodCountInterceptor,
2727
MethodAbortInterceptor,
28+
XGoogRequestIDHeaderInterceptor,
2829
)
2930

3031

@@ -34,6 +35,8 @@ class TestDatabase(Database):
3435
currently, and we don't want to make changes in the Database class for
3536
testing purpose as this is a hack to use interceptors in tests."""
3637

38+
_interceptors = []
39+
3740
def __init__(
3841
self,
3942
database_id,
@@ -74,6 +77,8 @@ def spanner_api(self):
7477
client_options = client._client_options
7578
if self._instance.emulator_host is not None:
7679
channel = grpc.insecure_channel(self._instance.emulator_host)
80+
self._x_goog_request_id_interceptor = XGoogRequestIDHeaderInterceptor()
81+
self._interceptors.append(self._x_goog_request_id_interceptor)
7782
channel = grpc.intercept_channel(channel, *self._interceptors)
7883
transport = SpannerGrpcTransport(channel=channel)
7984
self._spanner_api = SpannerClient(
@@ -110,3 +115,7 @@ def _create_spanner_client_for_tests(self, client_options, credentials):
110115
client_options=client_options,
111116
transport=transport,
112117
)
118+
119+
def reset(self):
120+
if self._x_goog_request_id_interceptor:
121+
self._x_goog_request_id_interceptor.reset()

google/cloud/spanner_v1/testing/interceptors.py

+71
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
from collections import defaultdict
16+
import threading
17+
1618
from grpc_interceptor import ClientInterceptor
1719
from google.api_core.exceptions import Aborted
1820

@@ -63,3 +65,72 @@ def reset(self):
6365
self._method_to_abort = None
6466
self._count = 0
6567
self._connection = None
68+
69+
70+
X_GOOG_REQUEST_ID = "x-goog-spanner-request-id"
71+
72+
73+
class XGoogRequestIDHeaderInterceptor(ClientInterceptor):
74+
# TODO:(@odeke-em): delete this guard when PR #1367 is merged.
75+
X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = False
76+
77+
def __init__(self):
78+
self._unary_req_segments = []
79+
self._stream_req_segments = []
80+
self.__lock = threading.Lock()
81+
82+
def intercept(self, method, request_or_iterator, call_details):
83+
metadata = call_details.metadata
84+
x_goog_request_id = None
85+
for key, value in metadata:
86+
if key == X_GOOG_REQUEST_ID:
87+
x_goog_request_id = value
88+
break
89+
90+
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id:
91+
raise Exception(
92+
f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}"
93+
)
94+
95+
response_or_iterator = method(request_or_iterator, call_details)
96+
streaming = getattr(response_or_iterator, "__iter__", None) is not None
97+
98+
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED:
99+
with self.__lock:
100+
if streaming:
101+
self._stream_req_segments.append(
102+
(call_details.method, parse_request_id(x_goog_request_id))
103+
)
104+
else:
105+
self._unary_req_segments.append(
106+
(call_details.method, parse_request_id(x_goog_request_id))
107+
)
108+
109+
return response_or_iterator
110+
111+
@property
112+
def unary_request_ids(self):
113+
return self._unary_req_segments
114+
115+
@property
116+
def stream_request_ids(self):
117+
return self._stream_req_segments
118+
119+
def reset(self):
120+
self._stream_req_segments.clear()
121+
self._unary_req_segments.clear()
122+
123+
124+
def parse_request_id(request_id_str):
125+
splits = request_id_str.split(".")
126+
version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list(
127+
map(lambda v: int(v), splits)
128+
)
129+
return (
130+
version,
131+
rand_process_id,
132+
client_id,
133+
channel_id,
134+
nth_request,
135+
nth_attempt,
136+
)

google/cloud/spanner_v1/testing/mock_spanner.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
from google.cloud.spanner_v1 import (
2323
TransactionOptions,
2424
ResultSetMetadata,
25-
ExecuteSqlRequest,
26-
ExecuteBatchDmlRequest,
2725
)
2826
from google.cloud.spanner_v1.testing.mock_database_admin import DatabaseAdminServicer
2927
import google.cloud.spanner_v1.testing.spanner_database_admin_pb2_grpc as database_admin_grpc
@@ -107,6 +105,7 @@ def CreateSession(self, request, context):
107105

108106
def BatchCreateSessions(self, request, context):
109107
self._requests.append(request)
108+
self.mock_spanner.pop_error(context)
110109
sessions = []
111110
for i in range(request.session_count):
112111
sessions.append(
@@ -186,9 +185,7 @@ def BeginTransaction(self, request, context):
186185
self._requests.append(request)
187186
return self.__create_transaction(request.session, request.options)
188187

189-
def __maybe_create_transaction(
190-
self, request: ExecuteSqlRequest | ExecuteBatchDmlRequest
191-
):
188+
def __maybe_create_transaction(self, request):
192189
started_transaction = None
193190
if not request.transaction.begin == TransactionOptions():
194191
started_transaction = self.__create_transaction(

tests/mockserver_tests/mock_server_test_base.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def setup_class(cls):
153153
def teardown_class(cls):
154154
if MockServerTestBase.server is not None:
155155
MockServerTestBase.server.stop(grace=None)
156+
Client.NTH_CLIENT.reset()
156157
MockServerTestBase.server = None
157158

158159
def setup_method(self, *args, **kwargs):
@@ -186,6 +187,8 @@ def instance(self) -> Instance:
186187
def database(self) -> Database:
187188
if self._database is None:
188189
self._database = self.instance.database(
189-
"test-database", pool=FixedSizePool(size=10)
190+
"test-database",
191+
pool=FixedSizePool(size=10),
192+
enable_interceptors_in_tests=True,
190193
)
191194
return self._database

tests/unit/test_transaction.py

+63
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
from google.cloud.spanner_v1 import TypeCode
2222
from google.api_core.retry import Retry
2323
from google.api_core import gapic_v1
24+
from google.cloud.spanner_v1._helpers import (
25+
AtomicCounter,
26+
_metadata_with_request_id,
27+
)
2428

2529
from tests._helpers import (
2630
HAS_OPENTELEMETRY_INSTALLED,
@@ -197,6 +201,11 @@ def test_begin_ok(self):
197201
[
198202
("google-cloud-resource-prefix", database.name),
199203
("x-goog-spanner-route-to-leader", "true"),
204+
# TODO(@odeke-em): enable with PR #1367.
205+
# (
206+
# "x-goog-spanner-request-id",
207+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
208+
# ),
200209
],
201210
)
202211

@@ -301,6 +310,11 @@ def test_rollback_ok(self):
301310
[
302311
("google-cloud-resource-prefix", database.name),
303312
("x-goog-spanner-route-to-leader", "true"),
313+
# TODO(@odeke-em): enable with PR #1367.
314+
# (
315+
# "x-goog-spanner-request-id",
316+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
317+
# ),
304318
],
305319
)
306320

@@ -492,6 +506,11 @@ def _commit_helper(
492506
[
493507
("google-cloud-resource-prefix", database.name),
494508
("x-goog-spanner-route-to-leader", "true"),
509+
# TODO(@odeke-em): enable with PR #1367.
510+
# (
511+
# "x-goog-spanner-request-id",
512+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
513+
# ),
495514
],
496515
)
497516
self.assertEqual(actual_request_options, expected_request_options)
@@ -666,6 +685,11 @@ def _execute_update_helper(
666685
metadata=[
667686
("google-cloud-resource-prefix", database.name),
668687
("x-goog-spanner-route-to-leader", "true"),
688+
# TODO(@odeke-em): enable with PR #1367.
689+
# (
690+
# "x-goog-spanner-request-id",
691+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
692+
# ),
669693
],
670694
)
671695

@@ -859,6 +883,11 @@ def _batch_update_helper(
859883
metadata=[
860884
("google-cloud-resource-prefix", database.name),
861885
("x-goog-spanner-route-to-leader", "true"),
886+
# TODO(@odeke-em): enable with PR #1367.
887+
# (
888+
# "x-goog-spanner-request-id",
889+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
890+
# ),
862891
],
863892
retry=retry,
864893
timeout=timeout,
@@ -974,6 +1003,11 @@ def test_context_mgr_success(self):
9741003
[
9751004
("google-cloud-resource-prefix", database.name),
9761005
("x-goog-spanner-route-to-leader", "true"),
1006+
# TODO(@odeke-em): enable with PR #1367.
1007+
# (
1008+
# "x-goog-spanner-request-id",
1009+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1",
1010+
# ),
9771011
],
9781012
)
9791013

@@ -1004,11 +1038,19 @@ def test_context_mgr_failure(self):
10041038

10051039

10061040
class _Client(object):
1041+
NTH_CLIENT = AtomicCounter()
1042+
10071043
def __init__(self):
10081044
from google.cloud.spanner_v1 import ExecuteSqlRequest
10091045

10101046
self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1")
10111047
self.directed_read_options = None
1048+
self._nth_client_id = _Client.NTH_CLIENT.increment()
1049+
self._nth_request = AtomicCounter()
1050+
1051+
@property
1052+
def _next_nth_request(self):
1053+
return self._nth_request.increment()
10121054

10131055

10141056
class _Instance(object):
@@ -1024,6 +1066,27 @@ def __init__(self):
10241066
self._directed_read_options = None
10251067
self.default_transaction_options = DefaultTransactionOptions()
10261068

1069+
@property
1070+
def _next_nth_request(self):
1071+
return self._instance._client._next_nth_request
1072+
1073+
@property
1074+
def _nth_client_id(self):
1075+
return self._instance._client._nth_client_id
1076+
1077+
def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]):
1078+
return _metadata_with_request_id(
1079+
self._nth_client_id,
1080+
self._channel_id,
1081+
nth_request,
1082+
nth_attempt,
1083+
prior_metadata,
1084+
)
1085+
1086+
@property
1087+
def _channel_id(self):
1088+
return 1
1089+
10271090

10281091
class _Session(object):
10291092
_transaction = None

0 commit comments

Comments
 (0)