From 3617921446d6121ea247d94df0d9ffae5c697c35 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 18 Sep 2024 17:45:27 -1000 Subject: [PATCH] observability: annotate Session+SessionPool events This change adds annotations for session and session pool events to aid customers in debugging latency issues with session pool malevolence and also for maintainers to figure out which session pool type is the most appropriate. Updates #1170 --- .../spanner_v1/_opentelemetry_tracing.py | 13 ++- google/cloud/spanner_v1/pool.py | 90 ++++++++++++++++++- google/cloud/spanner_v1/session.py | 19 +++- tests/_helpers.py | 11 +++ tests/unit/test_batch.py | 4 + tests/unit/test_database.py | 4 + tests/unit/test_pool.py | 7 ++ tests/unit/test_session.py | 38 ++++++++ tests/unit/test_snapshot.py | 4 + tests/unit/test_spanner.py | 4 + tests/unit/test_transaction.py | 4 + 11 files changed, 193 insertions(+), 5 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 51501a07a3..3a50b890d7 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -78,8 +78,17 @@ def trace_call(name, session, extra_attributes=None): try: yield span except Exception as error: - span.set_status(Status(StatusCode.ERROR, str(error))) - span.record_exception(error) + set_span_error_and_record_exception(span, error) raise else: span.set_status(Status(StatusCode.OK)) + + +def set_span_error_and_record_exception(span, exc): + if exc and span: + span.set_status(Status(StatusCode.ERROR, str(exc))) + span.record_exception(exc) + + +def get_current_span(): + return trace.get_current_span() diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 56837bfc0b..85461324ce 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -16,6 +16,7 @@ import datetime import queue +import time from google.cloud.exceptions import NotFound from google.cloud.spanner_v1 import BatchCreateSessionsRequest @@ -24,6 +25,9 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) +from google.cloud.spanner_v1._opentelemetry_tracing import ( + get_current_span, +) from warnings import warn _NOW = datetime.datetime.utcnow # unit tests may replace @@ -199,13 +203,32 @@ def bind(self, database): _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) self._database_role = self._database_role or self._database.database_role + requested_session_count = self.size - self._sessions.qsize() request = BatchCreateSessionsRequest( database=database.name, - session_count=self.size - self._sessions.qsize(), + session_count=requested_session_count, session_template=Session(creator_role=self.database_role), ) + current_span = get_current_span() + if requested_session_count > 0: + current_span.add_event( + f"Requesting {requested_session_count} sessions", + {"kind": "fixed_size_pool"}, + ) + + if self._sessions.full(): + current_span.add_event( + "Session pool is already full", {"kind": "fixed_size_pool"} + ) + return + + returned_session_count = 0 while not self._sessions.full(): + current_span.add_event( + f"Creating {request.session_count} sessions", + {"kind": "fixed_size_pool"}, + ) resp = api.batch_create_sessions( request=request, metadata=metadata, @@ -214,6 +237,12 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self._sessions.put(session) + returned_session_count += 1 + + current_span.add_event( + f"Requested for {requested_session_count}, returned {returned_session_count}", + {"kind": "fixed_size_pool"}, + ) def get(self, timeout=None): """Check a session out from the pool. @@ -229,12 +258,23 @@ def get(self, timeout=None): if timeout is None: timeout = self.default_timeout + start_time = time.time() + current_span = get_current_span() + current_span.add_event("Acquiring session", {"kind": type(self).__name__}) session = self._sessions.get(block=True, timeout=timeout) if not session.exists(): session = self._database.session() session.create() + current_span.add_event( + "Acquired session", + { + "time.elapsed": time.time() - start_time, + "session.id": session.session_id, + "kind": type(self).__name__, + }, + ) return session def put(self, session): @@ -307,6 +347,10 @@ def get(self): :returns: an existing session from the pool, or a newly-created session. """ + start_time = time.time() + current_span = get_current_span() + current_span.add_event("Acquiring session", {"kind": type(self).__name__}) + try: session = self._sessions.get_nowait() except queue.Empty: @@ -316,6 +360,15 @@ def get(self): if not session.exists(): session = self._new_session() session.create() + else: + current_span.add_event( + "Cache hit: has usable session", + { + "id": session.session_id, + "kind": type(self).__name__, + }, + ) + return session def put(self, session): @@ -422,6 +475,18 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) + requested_session_count = request.session_count + current_span = get_current_span() + current_span.add_event(f"Requesting {requested_session_count} sessions") + + if created_session_count >= self.size: + current_span.add_event( + "Created no new sessions as sessionPool is full", + {"kind": type(self).__name__}, + ) + return + + returned_session_count = 0 while created_session_count < self.size: resp = api.batch_create_sessions( request=request, @@ -431,8 +496,17 @@ def bind(self, database): session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self.put(session) + returned_session_count += 1 + created_session_count += len(resp.session) + current_span.add_event( + "Requested for {requested_session_count} sessions, return {returned_session_count}", + { + "kind": "pinging_pool", + }, + ) + def get(self, timeout=None): """Check a session out from the pool. @@ -447,6 +521,12 @@ def get(self, timeout=None): if timeout is None: timeout = self.default_timeout + start_time = time.time() + current_span = get_current_span() + current_span.add_event( + "Waiting for a session to become available", {"kind": "pinging_pool"} + ) + ping_after, session = self._sessions.get(block=True, timeout=timeout) if _NOW() > ping_after: @@ -457,6 +537,14 @@ def get(self, timeout=None): session = self._new_session() session.create() + current_span.add_event( + "Acquired session", + { + "time.elapsed": time.time() - start_time, + "session.id": session.session_id, + "kind": "pinging_pool", + }, + ) return session def put(self, session): diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 28280282f4..fc1553e068 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -30,7 +30,11 @@ _metadata_with_prefix, _metadata_with_leader_aware_routing, ) -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + get_current_span, + set_span_error_and_record_exception, + trace_call, +) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction @@ -113,6 +117,10 @@ def name(self): :raises ValueError: if session is not yet created """ if self._session_id is None: + err = "No session available" + current_span = get_current_span() + current_span.add_event(err) + set_span_error_and_record_exception(current_span, err) raise ValueError("No session ID set by back-end") return self._database.name + "/sessions/" + self._session_id @@ -124,8 +132,14 @@ def create(self): :raises ValueError: if :attr:`session_id` is already set. """ + current_span = get_current_span() + current_span.add_event("Creating Session") + if self._session_id is not None: - raise ValueError("Session ID already set by back-end") + err = "Session ID already set by back-end" + current_span.add_event(err) + set_span_error_and_record_exception(current_span, err) + raise ValueError(err) api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) if self._database._route_to_leader_enabled: @@ -148,6 +162,7 @@ def create(self): metadata=metadata, ) self._session_id = session_pb.name.split("/")[-1] + current_span.add_event("Using Session", {"id": self._session_id}) def exists(self): """Test for the existence of this session. diff --git a/tests/_helpers.py b/tests/_helpers.py index 5e514f2586..206bbfdc93 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -92,3 +92,14 @@ def assertSpanAttributes( self.assertEqual(span.name, name) self.assertEqual(span.status.status_code, status) self.assertEqual(dict(span.attributes), attributes) + + def assertSpanEvents(self, name, wantEventNames=[], span=None): + if HAS_OPENTELEMETRY_INSTALLED: + if not span: + span_list = self.ot_exporter.get_finished_spans() + self.assertEqual(len(span_list) > 0, true) + span = span_list[0] + + print("\033[31massertSpanEvent\033[00m") + self.assertEqual(span.name, name) + self.assertEqual(len(span.events), len(wantEventNames)) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 2f6b5e4ae9..a7f7a6f970 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -611,6 +611,10 @@ def __init__(self, database=None, name=TestBatch.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _Database(object): name = "testing" diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 90fa0c269f..6e29255fb7 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -3188,6 +3188,10 @@ def run_in_transaction(self, func, *args, **kw): self._retried = (func, args, kw) return self._committed + @property + def session_id(self): + return self.name + class _MockIterator(object): def __init__(self, *values, **kw): diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 23ed3e7251..589e89545a 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -14,6 +14,7 @@ from functools import total_ordering +import time import unittest import mock @@ -923,6 +924,8 @@ def __init__(self, database, exists=True, transaction=None): self.create = mock.Mock() self._deleted = False self._transaction = transaction + # Generate a faux id. + self._session_id = f"time.time()" def __lt__(self, other): return id(self) < id(other) @@ -949,6 +952,10 @@ def transaction(self): txn = self._transaction = _make_transaction(self) return txn + @property + def session_id(self): + return self._session_id + class _Database(object): def __init__(self, name): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 2ae0cb94b8..9a0280293d 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -15,6 +15,7 @@ import google.api_core.gapic_v1.method from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call import mock from tests._helpers import ( OpenTelemetryBase, @@ -174,6 +175,43 @@ def test_create_w_database_role(self): "CloudSpanner.CreateSession", attributes=TestSession.BASE_ATTRIBUTES ) + def test_create_session_span_annotations(self): + from google.cloud.spanner_v1 import CreateSessionRequest + from google.cloud.spanner_v1 import Session as SessionRequestProto + + session_pb = self._make_session_pb( + self.SESSION_NAME, database_role=self.DATABASE_ROLE + ) + + gax_api = self._make_spanner_api() + gax_api.create_session.return_value = session_pb + database = self._make_database(database_role=self.DATABASE_ROLE) + database.spanner_api = gax_api + session = self._make_one(database, database_role=self.DATABASE_ROLE) + + with trace_call("TestSessionSpan", session): + session.create() + + self.assertEqual(session.session_id, self.SESSION_ID) + self.assertEqual(session.database_role, self.DATABASE_ROLE) + session_template = SessionRequestProto(creator_role=self.DATABASE_ROLE) + + request = CreateSessionRequest( + database=database.name, + session=session_template, + ) + + gax_api.create_session.assert_called_once_with( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + + wantEventNames = ["Acquering session", "Creating Session", "Using Session"] + self.assertSpanEvents("TestSessionSpan", wantEventNames) + def test_create_wo_database_role(self): from google.cloud.spanner_v1 import CreateSessionRequest diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index bf7363fef2..479a0d62e9 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -1822,6 +1822,10 @@ def __init__(self, database=None, name=TestSnapshot.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _MockIterator(object): def __init__(self, *values, **kw): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index ab5479eb3c..ff34a109af 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -1082,6 +1082,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _MockIterator(object): def __init__(self, *values, **kw): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index d52fb61db1..e426f912b2 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -939,6 +939,10 @@ def __init__(self, database=None, name=TestTransaction.SESSION_NAME): self._database = database self.name = name + @property + def session_id(self): + return self.name + class _FauxSpannerAPI(object): _committed = None