-
Notifications
You must be signed in to change notification settings - Fork 98
observability: annotate Session+SessionPool events #1207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,10 +81,11 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) | |
tracer = get_tracer(tracer_provider) | ||
|
||
# Set base attributes that we know for every trace created | ||
db = session._database | ||
attributes = { | ||
"db.type": "spanner", | ||
"db.url": SpannerClient.DEFAULT_ENDPOINT, | ||
"db.instance": session._database.name, | ||
"db.instance": "" if not db else db.name, | ||
"net.host.name": SpannerClient.DEFAULT_ENDPOINT, | ||
OTEL_SCOPE_NAME: TRACER_NAME, | ||
OTEL_SCOPE_VERSION: TRACER_VERSION, | ||
|
@@ -106,7 +107,10 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) | |
yield span | ||
except Exception as error: | ||
span.set_status(Status(StatusCode.ERROR, str(error))) | ||
span.record_exception(error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets not remove this exception. We are not sure if there are any cases where the span will end up not recording an exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I was wondering that this behavior of exception getting added twice was not seen earlier since this code exists from very long. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's because OpenTelemetry was upgraded only recently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do have locked tests to check for the exceptions to ensure that they are in there and from Span.enter. I had to dive back into OpenTelemetry-Python's code as it isn't even documented and in our demos it was very distracting to have mysteriously both errors. I think for the sake of our sanity and project stability let's leave that comment in and if anything happens it is a trivial one to add back @harshachinta There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. But the opentelemetry documentation for Python guides to record exception for instrumentation libraries. Can you share the code pointer on where the opentelemetry records exception by default when exiting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @harshachinta it was the cause of us seeing 2 exceptions and took a ton of confusion and time for me to debug, they don't seem to document this condition. |
||
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__ | ||
# on any exception. We should file a bug later on with them to only | ||
# invoke .record_exception if not already invoked, hence we should not | ||
# invoke .record_exception on our own else we shall have 2 exceptions. | ||
raise | ||
else: | ||
if (not span._status) or span._status.status_code == StatusCode.UNSET: | ||
|
@@ -116,3 +120,14 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) | |
# it wasn't previously set otherwise. | ||
# https://github.com/googleapis/python-spanner/issues/1246 | ||
span.set_status(Status(StatusCode.OK)) | ||
|
||
|
||
def get_current_span(): | ||
if not HAS_OPENTELEMETRY_INSTALLED: | ||
return None | ||
return trace.get_current_span() | ||
odeke-em marked this conversation as resolved.
Show resolved
Hide resolved
odeke-em marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def add_span_event(span, event_name, event_attributes=None): | ||
if span: | ||
span.add_event(event_name, event_attributes) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
import datetime | ||
import queue | ||
import time | ||
|
||
from google.cloud.exceptions import NotFound | ||
from google.cloud.spanner_v1 import BatchCreateSessionsRequest | ||
|
@@ -24,6 +25,10 @@ | |
_metadata_with_prefix, | ||
_metadata_with_leader_aware_routing, | ||
) | ||
from google.cloud.spanner_v1._opentelemetry_tracing import ( | ||
add_span_event, | ||
get_current_span, | ||
) | ||
from warnings import warn | ||
|
||
_NOW = datetime.datetime.utcnow # unit tests may replace | ||
|
@@ -196,20 +201,50 @@ def bind(self, database): | |
when needed. | ||
""" | ||
self._database = database | ||
requested_session_count = self.size - self._sessions.qsize() | ||
span = get_current_span() | ||
span_event_attributes = {"kind": type(self).__name__} | ||
|
||
if requested_session_count <= 0: | ||
add_span_event( | ||
span, | ||
f"Invalid session pool size({requested_session_count}) <= 0", | ||
span_event_attributes, | ||
) | ||
return | ||
|
||
api = database.spanner_api | ||
metadata = _metadata_with_prefix(database.name) | ||
if database._route_to_leader_enabled: | ||
metadata.append( | ||
_metadata_with_leader_aware_routing(database._route_to_leader_enabled) | ||
) | ||
self._database_role = self._database_role or self._database.database_role | ||
if requested_session_count > 0: | ||
add_span_event( | ||
span, | ||
f"Requesting {requested_session_count} sessions", | ||
span_event_attributes, | ||
) | ||
|
||
if self._sessions.full(): | ||
add_span_event(span, "Session pool is already full", span_event_attributes) | ||
return | ||
|
||
request = BatchCreateSessionsRequest( | ||
database=database.name, | ||
session_count=self.size - self._sessions.qsize(), | ||
session_count=requested_session_count, | ||
session_template=Session(creator_role=self.database_role), | ||
) | ||
|
||
returned_session_count = 0 | ||
while not self._sessions.full(): | ||
request.session_count = requested_session_count - self._sessions.qsize() | ||
add_span_event( | ||
span, | ||
f"Creating {request.session_count} sessions", | ||
span_event_attributes, | ||
) | ||
resp = api.batch_create_sessions( | ||
request=request, | ||
metadata=metadata, | ||
|
@@ -218,6 +253,13 @@ def bind(self, database): | |
session = self._new_session() | ||
session._session_id = session_pb.name.split("/")[-1] | ||
self._sessions.put(session) | ||
returned_session_count += 1 | ||
|
||
add_span_event( | ||
span, | ||
f"Requested for {requested_session_count} sessions, returned {returned_session_count}", | ||
span_event_attributes, | ||
) | ||
|
||
def get(self, timeout=None): | ||
"""Check a session out from the pool. | ||
|
@@ -233,12 +275,43 @@ def get(self, timeout=None): | |
if timeout is None: | ||
timeout = self.default_timeout | ||
|
||
session = self._sessions.get(block=True, timeout=timeout) | ||
age = _NOW() - session.last_use_time | ||
start_time = time.time() | ||
current_span = get_current_span() | ||
span_event_attributes = {"kind": type(self).__name__} | ||
add_span_event(current_span, "Acquiring session", span_event_attributes) | ||
|
||
if age >= self._max_age and not session.exists(): | ||
session = self._database.session() | ||
session.create() | ||
session = None | ||
try: | ||
add_span_event( | ||
current_span, | ||
"Waiting for a session to become available", | ||
span_event_attributes, | ||
) | ||
|
||
session = self._sessions.get(block=True, timeout=timeout) | ||
age = _NOW() - session.last_use_time | ||
|
||
if age >= self._max_age and not session.exists(): | ||
if not session.exists(): | ||
add_span_event( | ||
current_span, | ||
"Session is not valid, recreating it", | ||
span_event_attributes, | ||
) | ||
session = self._database.session() | ||
session.create() | ||
# Replacing with the updated session.id. | ||
span_event_attributes["session.id"] = session._session_id | ||
|
||
span_event_attributes["session.id"] = session._session_id | ||
span_event_attributes["time.elapsed"] = time.time() - start_time | ||
add_span_event(current_span, "Acquired session", span_event_attributes) | ||
|
||
except queue.Empty as e: | ||
add_span_event( | ||
current_span, "No sessions available in the pool", span_event_attributes | ||
) | ||
raise e | ||
|
||
return session | ||
|
||
|
@@ -312,13 +385,32 @@ def get(self): | |
:returns: an existing session from the pool, or a newly-created | ||
session. | ||
""" | ||
current_span = get_current_span() | ||
span_event_attributes = {"kind": type(self).__name__} | ||
add_span_event(current_span, "Acquiring session", span_event_attributes) | ||
|
||
try: | ||
add_span_event( | ||
current_span, | ||
"Waiting for a session to become available", | ||
span_event_attributes, | ||
) | ||
session = self._sessions.get_nowait() | ||
except queue.Empty: | ||
odeke-em marked this conversation as resolved.
Show resolved
Hide resolved
|
||
add_span_event( | ||
current_span, | ||
"No sessions available in pool. Creating session", | ||
span_event_attributes, | ||
) | ||
session = self._new_session() | ||
session.create() | ||
else: | ||
if not session.exists(): | ||
odeke-em marked this conversation as resolved.
Show resolved
Hide resolved
|
||
add_span_event( | ||
current_span, | ||
"Session is not valid, recreating it", | ||
span_event_attributes, | ||
) | ||
session = self._new_session() | ||
session.create() | ||
return session | ||
|
@@ -427,6 +519,38 @@ def bind(self, database): | |
session_template=Session(creator_role=self.database_role), | ||
) | ||
|
||
span_event_attributes = {"kind": type(self).__name__} | ||
current_span = get_current_span() | ||
requested_session_count = request.session_count | ||
if requested_session_count <= 0: | ||
add_span_event( | ||
current_span, | ||
f"Invalid session pool size({requested_session_count}) <= 0", | ||
span_event_attributes, | ||
) | ||
return | ||
|
||
add_span_event( | ||
current_span, | ||
f"Requesting {requested_session_count} sessions", | ||
span_event_attributes, | ||
) | ||
|
||
if created_session_count >= self.size: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This condition will not occur anytime as bind is called only once and that time pool is empty. We can remove this span? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had it in my tests as a condition for a reused pool but sure I can delete it. |
||
add_span_event( | ||
current_span, | ||
"Created no new sessions as sessionPool is full", | ||
span_event_attributes, | ||
) | ||
return | ||
|
||
add_span_event( | ||
current_span, | ||
f"Creating {request.session_count} sessions", | ||
span_event_attributes, | ||
) | ||
|
||
returned_session_count = 0 | ||
while created_session_count < self.size: | ||
resp = api.batch_create_sessions( | ||
request=request, | ||
|
@@ -436,8 +560,16 @@ def bind(self, database): | |
session = self._new_session() | ||
session._session_id = session_pb.name.split("/")[-1] | ||
self.put(session) | ||
returned_session_count += 1 | ||
|
||
created_session_count += len(resp.session) | ||
|
||
add_span_event( | ||
current_span, | ||
f"Requested for {requested_session_count} sessions, return {returned_session_count}", | ||
span_event_attributes, | ||
) | ||
|
||
def get(self, timeout=None): | ||
"""Check a session out from the pool. | ||
|
||
|
@@ -452,7 +584,26 @@ def get(self, timeout=None): | |
if timeout is None: | ||
timeout = self.default_timeout | ||
|
||
ping_after, session = self._sessions.get(block=True, timeout=timeout) | ||
start_time = time.time() | ||
span_event_attributes = {"kind": type(self).__name__} | ||
current_span = get_current_span() | ||
add_span_event( | ||
current_span, | ||
"Waiting for a session to become available", | ||
span_event_attributes, | ||
) | ||
|
||
ping_after = None | ||
session = None | ||
try: | ||
ping_after, session = self._sessions.get(block=True, timeout=timeout) | ||
except queue.Empty as e: | ||
add_span_event( | ||
current_span, | ||
"No sessions available in the pool within the specified timeout", | ||
span_event_attributes, | ||
) | ||
raise e | ||
|
||
if _NOW() > ping_after: | ||
# Using session.exists() guarantees the returned session exists. | ||
|
@@ -462,6 +613,14 @@ def get(self, timeout=None): | |
session = self._new_session() | ||
session.create() | ||
|
||
span_event_attributes.update( | ||
{ | ||
"time.elapsed": time.time() - start_time, | ||
"session.id": session._session_id, | ||
"kind": "pinging_pool", | ||
} | ||
) | ||
add_span_event(current_span, "Acquired session", span_event_attributes) | ||
return session | ||
|
||
def put(self, session): | ||
|
Uh oh!
There was an error while loading. Please reload this page.