Skip to content

Commit 423e5bc

Browse files
committed
fix(tracing): ensure nesting of Transaction.begin under commit + fix suggestions from feature review
This change ensures that: * If a transaction was not yet begin, that if .commit() is invoked the resulting span hierarchy has .begin nested under .commit * We use "CloudSpanner.Transaction.execute_sql" instead of "CloudSpanner.Transaction.execute_streaming_sql" * If we have a tracer_provider that produces non-recordings spans, that it won't crash due to lacking `span._status` Fixes #1286
1 parent 04a11a6 commit 423e5bc

File tree

9 files changed

+403
-54
lines changed

9 files changed

+403
-54
lines changed

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
117117
# invoke .record_exception on our own else we shall have 2 exceptions.
118118
raise
119119
else:
120-
if (not span._status) or span._status.status_code == StatusCode.UNSET:
120+
# All spans still have set_status available even if for example
121+
# NonRecordingSpan doesn't have "_status".
122+
absent_span_status = getattr(span, "_status", None) is None
123+
if absent_span_status or span._status.status_code == StatusCode.UNSET:
121124
# OpenTelemetry-Python only allows a status change
122125
# if the current code is UNSET or ERROR. At the end
123126
# of the generator's consumption, only set it to OK

google/cloud/spanner_v1/snapshot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ def _get_streamed_result_set(
583583
iterator = _restart_on_unavailable(
584584
restart,
585585
request,
586-
f"CloudSpanner.{type(self).__name__}.execute_streaming_sql",
586+
f"CloudSpanner.{type(self).__name__}.execute_sql",
587587
self._session,
588588
trace_attributes,
589589
transaction=self,

google/cloud/spanner_v1/transaction.py

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -242,39 +242,7 @@ def commit(
242242
:returns: timestamp of the committed changes.
243243
:raises ValueError: if there are no mutations to commit.
244244
"""
245-
self._check_state()
246-
if self._transaction_id is None and len(self._mutations) > 0:
247-
self.begin()
248-
elif self._transaction_id is None and len(self._mutations) == 0:
249-
raise ValueError("Transaction is not begun")
250-
251245
database = self._session._database
252-
api = database.spanner_api
253-
metadata = _metadata_with_prefix(database.name)
254-
if database._route_to_leader_enabled:
255-
metadata.append(
256-
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
257-
)
258-
259-
if request_options is None:
260-
request_options = RequestOptions()
261-
elif type(request_options) is dict:
262-
request_options = RequestOptions(request_options)
263-
if self.transaction_tag is not None:
264-
request_options.transaction_tag = self.transaction_tag
265-
266-
# Request tags are not supported for commit requests.
267-
request_options.request_tag = None
268-
269-
request = CommitRequest(
270-
session=self._session.name,
271-
mutations=self._mutations,
272-
transaction_id=self._transaction_id,
273-
return_commit_stats=return_commit_stats,
274-
max_commit_delay=max_commit_delay,
275-
request_options=request_options,
276-
)
277-
278246
trace_attributes = {"num_mutations": len(self._mutations)}
279247
observability_options = getattr(database, "observability_options", None)
280248
with trace_call(
@@ -283,6 +251,40 @@ def commit(
283251
trace_attributes,
284252
observability_options,
285253
) as span:
254+
self._check_state()
255+
if self._transaction_id is None and len(self._mutations) > 0:
256+
self.begin()
257+
elif self._transaction_id is None and len(self._mutations) == 0:
258+
raise ValueError("Transaction is not begun")
259+
260+
api = database.spanner_api
261+
metadata = _metadata_with_prefix(database.name)
262+
if database._route_to_leader_enabled:
263+
metadata.append(
264+
_metadata_with_leader_aware_routing(
265+
database._route_to_leader_enabled
266+
)
267+
)
268+
269+
if request_options is None:
270+
request_options = RequestOptions()
271+
elif type(request_options) is dict:
272+
request_options = RequestOptions(request_options)
273+
if self.transaction_tag is not None:
274+
request_options.transaction_tag = self.transaction_tag
275+
276+
# Request tags are not supported for commit requests.
277+
request_options.request_tag = None
278+
279+
request = CommitRequest(
280+
session=self._session.name,
281+
mutations=self._mutations,
282+
transaction_id=self._transaction_id,
283+
return_commit_stats=return_commit_stats,
284+
max_commit_delay=max_commit_delay,
285+
request_options=request_options,
286+
)
287+
286288
add_span_event(span, "Starting Commit")
287289

288290
method = functools.partial(

tests/_helpers.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,20 @@ def get_finished_spans(self):
132132

133133
def reset(self):
134134
self.tearDown()
135+
136+
def finished_spans_events_statuses(self):
137+
span_list = self.get_finished_spans()
138+
# Some event attributes are noisy/highly ephemeral
139+
# and can't be directly compared against.
140+
got_all_events = []
141+
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
142+
for span in span_list:
143+
for event in span.events:
144+
evt_attributes = event.attributes.copy()
145+
for attr_name in imprecise_event_attributes:
146+
if attr_name in evt_attributes:
147+
evt_attributes[attr_name] = "EPHEMERAL"
148+
149+
got_all_events.append((event.name, evt_attributes))
150+
151+
return got_all_events

tests/system/test_observability_options.py

Lines changed: 198 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def test_propagation(enable_extended_tracing):
107107
gotNames = [span.name for span in from_inject_spans]
108108
wantNames = [
109109
"CloudSpanner.CreateSession",
110-
"CloudSpanner.Snapshot.execute_streaming_sql",
110+
"CloudSpanner.Snapshot.execute_sql",
111111
]
112112
assert gotNames == wantNames
113113

@@ -216,8 +216,8 @@ def select_in_txn(txn):
216216
"CloudSpanner.Database.run_in_transaction",
217217
"CloudSpanner.CreateSession",
218218
"CloudSpanner.Session.run_in_transaction",
219-
"CloudSpanner.Transaction.execute_streaming_sql",
220-
"CloudSpanner.Transaction.execute_streaming_sql",
219+
"CloudSpanner.Transaction.execute_sql",
220+
"CloudSpanner.Transaction.execute_sql",
221221
"CloudSpanner.Transaction.commit",
222222
]
223223

@@ -262,13 +262,206 @@ def select_in_txn(txn):
262262
("CloudSpanner.Database.run_in_transaction", codes.OK, None),
263263
("CloudSpanner.CreateSession", codes.OK, None),
264264
("CloudSpanner.Session.run_in_transaction", codes.OK, None),
265-
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
266-
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
265+
("CloudSpanner.Transaction.execute_sql", codes.OK, None),
266+
("CloudSpanner.Transaction.execute_sql", codes.OK, None),
267267
("CloudSpanner.Transaction.commit", codes.OK, None),
268268
]
269269
assert got_statuses == want_statuses
270270

271271

272+
@pytest.mark.skipif(
273+
not _helpers.USE_EMULATOR,
274+
reason="Emulator needed to run this tests",
275+
)
276+
@pytest.mark.skipif(
277+
not HAS_OTEL_INSTALLED,
278+
reason="Tracing requires OpenTelemetry",
279+
)
280+
def test_transaction_update_implicit_begin_nested_inside_commit():
281+
# Tests to ensure that transaction.commit() without a began transaction
282+
# has transaction.begin() inlined and nested under the commit span.
283+
from google.auth.credentials import AnonymousCredentials
284+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
285+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
286+
InMemorySpanExporter,
287+
)
288+
from opentelemetry.trace.status import StatusCode
289+
from opentelemetry.sdk.trace import TracerProvider
290+
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
291+
292+
PROJECT = _helpers.EMULATOR_PROJECT
293+
CONFIGURATION_NAME = "config-name"
294+
INSTANCE_ID = _helpers.INSTANCE_ID
295+
DISPLAY_NAME = "display-name"
296+
DATABASE_ID = _helpers.unique_id("temp_db")
297+
NODE_COUNT = 5
298+
LABELS = {"test": "true"}
299+
300+
def tx_update(txn):
301+
txn.update(
302+
"Singers",
303+
columns=["SingerId", "FirstName"],
304+
values=[["1", "Bryan"], ["2", "Slash"]],
305+
)
306+
307+
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
308+
trace_exporter = InMemorySpanExporter()
309+
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
310+
observability_options = dict(
311+
tracer_provider=tracer_provider,
312+
enable_extended_tracing=True,
313+
)
314+
315+
client = Client(
316+
project=PROJECT,
317+
observability_options=observability_options,
318+
credentials=AnonymousCredentials(),
319+
)
320+
321+
instance = client.instance(
322+
INSTANCE_ID,
323+
CONFIGURATION_NAME,
324+
display_name=DISPLAY_NAME,
325+
node_count=NODE_COUNT,
326+
labels=LABELS,
327+
)
328+
329+
try:
330+
instance.create()
331+
except Exception:
332+
pass
333+
334+
db = instance.database(DATABASE_ID)
335+
try:
336+
db._ddl_statements = [
337+
"""CREATE TABLE Singers (
338+
SingerId INT64 NOT NULL,
339+
FirstName STRING(1024),
340+
LastName STRING(1024),
341+
SingerInfo BYTES(MAX),
342+
FullName STRING(2048) AS (
343+
ARRAY_TO_STRING([FirstName, LastName], " ")
344+
) STORED
345+
) PRIMARY KEY (SingerId)""",
346+
"""CREATE TABLE Albums (
347+
SingerId INT64 NOT NULL,
348+
AlbumId INT64 NOT NULL,
349+
AlbumTitle STRING(MAX),
350+
MarketingBudget INT64,
351+
) PRIMARY KEY (SingerId, AlbumId),
352+
INTERLEAVE IN PARENT Singers ON DELETE CASCADE""",
353+
]
354+
db.create()
355+
except Exception:
356+
pass
357+
358+
try:
359+
db.run_in_transaction(tx_update)
360+
except Exception:
361+
pass
362+
363+
span_list = trace_exporter.get_finished_spans()
364+
# Sort the spans by their start time in the hierarchy.
365+
span_list = sorted(span_list, key=lambda span: span.start_time)
366+
got_span_names = [span.name for span in span_list]
367+
want_span_names = [
368+
"CloudSpanner.Database.run_in_transaction",
369+
"CloudSpanner.CreateSession",
370+
"CloudSpanner.Session.run_in_transaction",
371+
"CloudSpanner.Transaction.commit",
372+
"CloudSpanner.Transaction.begin",
373+
]
374+
375+
assert got_span_names == want_span_names
376+
377+
# Our object is to ensure that .begin() is a child of .commit()
378+
span_tx_begin = span_list[-1]
379+
span_tx_commit = span_list[-2]
380+
assert span_tx_begin.parent.span_id == span_tx_commit.context.span_id
381+
382+
got_events = []
383+
got_statuses = []
384+
385+
# Some event attributes are noisy/highly ephemeral
386+
# and can't be directly compared against.
387+
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
388+
for span in span_list:
389+
got_statuses.append(
390+
(span.name, span.status.status_code, span.status.description)
391+
)
392+
for event in span.events:
393+
evt_attributes = event.attributes.copy()
394+
for attr_name in imprecise_event_attributes:
395+
if attr_name in evt_attributes:
396+
evt_attributes[attr_name] = "EPHEMERAL"
397+
398+
got_events.append((event.name, evt_attributes))
399+
400+
# Check for the series of events
401+
want_events = [
402+
("Acquiring session", {"kind": "BurstyPool"}),
403+
("Waiting for a session to become available", {"kind": "BurstyPool"}),
404+
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
405+
("Creating Session", {}),
406+
(
407+
"exception",
408+
{
409+
"exception.type": "google.api_core.exceptions.NotFound",
410+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
411+
"exception.stacktrace": "EPHEMERAL",
412+
"exception.escaped": "False",
413+
},
414+
),
415+
(
416+
"Transaction.commit failed due to GoogleAPICallError, not retrying",
417+
{"attempt": 1},
418+
),
419+
(
420+
"exception",
421+
{
422+
"exception.type": "google.api_core.exceptions.NotFound",
423+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
424+
"exception.stacktrace": "EPHEMERAL",
425+
"exception.escaped": "False",
426+
},
427+
),
428+
("Starting Commit", {}),
429+
(
430+
"exception",
431+
{
432+
"exception.type": "google.api_core.exceptions.NotFound",
433+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
434+
"exception.stacktrace": "EPHEMERAL",
435+
"exception.escaped": "False",
436+
},
437+
),
438+
]
439+
assert got_events == want_events
440+
441+
# Check for the statues.
442+
codes = StatusCode
443+
want_statuses = [
444+
(
445+
"CloudSpanner.Database.run_in_transaction",
446+
codes.ERROR,
447+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
448+
),
449+
("CloudSpanner.CreateSession", codes.OK, None),
450+
(
451+
"CloudSpanner.Session.run_in_transaction",
452+
codes.ERROR,
453+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
454+
),
455+
(
456+
"CloudSpanner.Transaction.commit",
457+
codes.ERROR,
458+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
459+
),
460+
("CloudSpanner.Transaction.begin", codes.OK, None),
461+
]
462+
assert got_statuses == want_statuses
463+
464+
272465
def _make_credentials():
273466
from google.auth.credentials import AnonymousCredentials
274467

tests/unit/test__opentelemetry_tracing.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def test_trace_codeless_error(self):
159159
span = span_list[0]
160160
self.assertEqual(span.status.status_code, StatusCode.ERROR)
161161

162-
def test_trace_call_terminal_span_status(self):
162+
def test_trace_call_terminal_span_status_ALWAYS_ON_sampler(self):
163163
# Verify that we don't unconditionally set the terminal span status to
164164
# SpanStatus.OK per https://github.com/googleapis/python-spanner/issues/1246
165165
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
@@ -195,3 +195,32 @@ def test_trace_call_terminal_span_status(self):
195195
("VerifyTerminalSpanStatus", StatusCode.ERROR, "Our error exhibit"),
196196
]
197197
assert got_statuses == want_statuses
198+
199+
def test_trace_call_terminal_span_status_ALWAYS_OFF_sampler(self):
200+
# Verify that we get the correct status even when using the ALWAYS_OFF
201+
# sampler which produces the NonRecordingSpan per
202+
# https://github.com/googleapis/python-spanner/issues/1286
203+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
204+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
205+
InMemorySpanExporter,
206+
)
207+
from opentelemetry.sdk.trace import TracerProvider
208+
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
209+
210+
tracer_provider = TracerProvider(sampler=ALWAYS_OFF)
211+
trace_exporter = InMemorySpanExporter()
212+
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
213+
observability_options = dict(tracer_provider=tracer_provider)
214+
215+
session = _make_session()
216+
used_span = None
217+
with _opentelemetry_tracing.trace_call(
218+
"VerifyWithNonRecordingSpan",
219+
session,
220+
observability_options=observability_options,
221+
) as span:
222+
used_span = span
223+
224+
assert type(used_span).__name__ == "NonRecordingSpan"
225+
span_list = list(trace_exporter.get_finished_spans())
226+
assert span_list == []

0 commit comments

Comments
 (0)