Skip to content

feat(pymongo): introduce db.operation, refactor db.statement, refactor span name #3606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-pymongo` `aggregate` and `getMore` capture statements support
([#3601](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3601))

### Breaking changes

- `opentelemetry-instrumentation-pymongo` introduce `db.operation`, refactor `db.statement`, refactor span name
([#3606](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3606))

## Version 1.34.0/0.55b0 (2025-06-04)

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ def started(self, event: monitoring.CommandStartedEvent):
if not self.is_enabled or not is_instrumentation_enabled():
return
command_name = event.command_name
span_name = f"{event.database_name}.{command_name}"
statement = self._get_statement_by_command_name(command_name, event)
span_name = _get_span_name(event)
collection = event.command.get(event.command_name)

try:
Expand All @@ -147,8 +146,15 @@ def started(self, event: monitoring.CommandStartedEvent):
SpanAttributes.DB_SYSTEM, DbSystemValues.MONGODB.value
)
span.set_attribute(SpanAttributes.DB_NAME, event.database_name)
span.set_attribute(SpanAttributes.DB_STATEMENT, statement)
if collection:
span.set_attribute(SpanAttributes.DB_OPERATION, command_name)
if self.capture_statement:
db_statement = _get_statement(event)
if db_statement is not None:
span.set_attribute(
SpanAttributes.DB_STATEMENT,
_get_statement(event),
)
if collection and isinstance(collection, str):
span.set_attribute(
SpanAttributes.DB_MONGODB_COLLECTION, collection
)
Expand Down Expand Up @@ -210,15 +216,24 @@ def failed(self, event: monitoring.CommandFailedEvent):
def _pop_span(self, event: CommandEvent) -> Span | None:
return self._span_dict.pop(_get_span_dict_key(event), None)

def _get_statement_by_command_name(
self, command_name: str, event: CommandEvent
) -> str:
statement = command_name
command_attribute = COMMAND_TO_ATTRIBUTE_MAPPING.get(command_name)
command = event.command.get(command_attribute)
if command and self.capture_statement:
statement += " " + str(command)
return statement

def _get_span_name(event: CommandEvent) -> str:
"""Get the span name for a given pymongo event."""
command_name = event.command_name
collection = event.command.get(command_name)
if collection and isinstance(collection, str):
return f"{event.database_name}.{collection}.{command_name}"
return f"{event.database_name}.{command_name}"


def _get_statement(event: CommandEvent) -> str | None:
"""Get the statement for a given pymongo event."""
command_name = event.command_name
command_attribute = COMMAND_TO_ATTRIBUTE_MAPPING.get(command_name)
command = event.command.get(command_attribute)
if command is not None:
return f"{command}"
return None


def _get_span_dict_key(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def test_started(self):
self.assertEqual(
span.attributes[SpanAttributes.DB_NAME], "database_name"
)
self.assertEqual(span.attributes[SpanAttributes.DB_STATEMENT], "find")
self.assertEqual(span.attributes[SpanAttributes.DB_OPERATION], "find")
self.assertNotIn(SpanAttributes.DB_STATEMENT, span.attributes)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], "test.com"
)
Expand Down Expand Up @@ -210,7 +211,10 @@ def test_capture_statement_getmore(self):

self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT],
"getMore test_collection",
"test_collection",
)
self.assertEqual(
span.attributes[SpanAttributes.DB_OPERATION], "getMore"
)

def test_capture_statement_aggregate(self):
Expand All @@ -232,10 +236,13 @@ def test_capture_statement_aggregate(self):
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

expected_statement = f"aggregate {pipeline}"
expected_statement = f"{pipeline}"
self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT], expected_statement
)
self.assertEqual(
span.attributes[SpanAttributes.DB_OPERATION], "aggregate"
)

def test_capture_statement_disabled_getmore(self):
command_attrs = {
Expand All @@ -253,9 +260,11 @@ def test_capture_statement_disabled_getmore(self):
span = spans_list[0]

self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT], "getMore"
span.attributes[SpanAttributes.DB_OPERATION], "getMore"
)

self.assertNotIn(SpanAttributes.DB_STATEMENT, span.attributes)

def test_capture_statement_disabled_aggregate(self):
pipeline = [{"$match": {"status": "active"}}]
command_attrs = {
Expand All @@ -273,8 +282,37 @@ def test_capture_statement_disabled_aggregate(self):
span = spans_list[0]

self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT], "aggregate"
span.attributes[SpanAttributes.DB_OPERATION], "aggregate"
)

self.assertNotIn(SpanAttributes.DB_STATEMENT, span.attributes)

def test_endsessions_command_with_dict_list_collection(self):
# Test for https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1918
# endSessions command has a list of dictionaries as collection value
command_attrs = {
"command_name": "endSessions",
"endSessions": [
{"id": {"id": "session1"}},
{"id": {"id": "session2"}},
],
}
command_tracer = CommandTracer(self.tracer)
mock_event = MockEvent(command_attrs)
command_tracer.started(event=mock_event)
command_tracer.succeeded(event=mock_event)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Should not have DB_MONGODB_COLLECTION attribute since collection is not a string
self.assertNotIn(SpanAttributes.DB_MONGODB_COLLECTION, span.attributes)
self.assertEqual(
span.attributes[SpanAttributes.DB_OPERATION], "endSessions"
)
# Span name should not include collection name
self.assertEqual(span.name, "database_name.endSessions")


class MockCommand:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def tearDown(self):
self.instrumentor.uninstrument()
super().tearDown()

def validate_spans(self, expected_db_statement):
def validate_spans(
self, expected_db_operation, expected_db_statement=None
):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
Expand Down Expand Up @@ -74,7 +76,11 @@ def validate_spans(self, expected_db_statement):
MONGODB_COLLECTION_NAME,
)
self.assertEqual(
pymongo_span.attributes[SpanAttributes.DB_STATEMENT],
pymongo_span.attributes[SpanAttributes.DB_OPERATION],
expected_db_operation,
)
self.assertEqual(
pymongo_span.attributes.get(SpanAttributes.DB_STATEMENT, None),
expected_db_statement,
)

Expand All @@ -86,11 +92,12 @@ def test_insert(self):
)
insert_result_id = insert_result.inserted_id

expected_db_operation = "insert"
expected_db_statement = (
f"insert [{{'name': 'testName', 'value': 'testValue', '_id': "
f"[{{'name': 'testName', 'value': 'testValue', '_id': "
f"ObjectId('{insert_result_id}')}}]"
)
self.validate_spans(expected_db_statement)
self.validate_spans(expected_db_operation, expected_db_statement)

def test_update(self):
"""Should create a child span for update"""
Expand All @@ -99,29 +106,32 @@ def test_update(self):
{"name": "testName"}, {"$set": {"value": "someOtherValue"}}
)

expected_db_operation = "update"
expected_db_statement = (
"update [SON([('q', {'name': 'testName'}), ('u', "
"[SON([('q', {'name': 'testName'}), ('u', "
"{'$set': {'value': 'someOtherValue'}}), ('multi', False), ('upsert', False)])]"
)
self.validate_spans(expected_db_statement)
self.validate_spans(expected_db_operation, expected_db_statement)

def test_find(self):
"""Should create a child span for find"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one({"name": "testName"})

expected_db_statement = "find {'name': 'testName'}"
self.validate_spans(expected_db_statement)
expected_db_operation = "find"
expected_db_statement = "{'name': 'testName'}"
self.validate_spans(expected_db_operation, expected_db_statement)

def test_delete(self):
"""Should create a child span for delete"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"})

expected_db_operation = "delete"
expected_db_statement = (
"delete [SON([('q', {'name': 'testName'}), ('limit', 1)])]"
"[SON([('q', {'name': 'testName'}), ('limit', 1)])]"
)
self.validate_spans(expected_db_statement)
self.validate_spans(expected_db_operation, expected_db_statement)

def test_find_without_capture_statement(self):
"""Should create a child span for find"""
Expand All @@ -130,8 +140,9 @@ def test_find_without_capture_statement(self):
with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one({"name": "testName"})

expected_db_statement = "find"
self.validate_spans(expected_db_statement)
expected_db_operation = "find"
expected_db_statement = None
self.validate_spans(expected_db_operation, expected_db_statement)

def test_uninstrument(self):
# check that integration is working
Expand All @@ -152,3 +163,44 @@ def test_uninstrument(self):
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

def test_session_end_no_error(self):
"""Test that endSessions doesn't cause instrumentation errors (issue #1918)"""
client = MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)

with self._tracer.start_as_current_span("rootSpan"):
session = client.start_session()
db = client[MONGODB_DB_NAME]
collection = db[MONGODB_COLLECTION_NAME]
# Do a simple operation within the session
collection.find_one({"test": "123"})
# End the session - this should not cause an error
session.end_session()

# Verify spans were created without errors
spans = self.memory_exporter.get_finished_spans()
# Should have at least the find and endSessions operations
self.assertGreaterEqual(len(spans), 2)

session_end_spans = [
s
for s in spans
if s.attributes.get(SpanAttributes.DB_OPERATION) == "endSessions"
]
if session_end_spans:
span = session_end_spans[0]
# Should not have DB_MONGODB_COLLECTION attribute since endSessions collection is not a string
self.assertNotIn(
SpanAttributes.DB_MONGODB_COLLECTION, span.attributes
)
# Should have other expected attributes
self.assertEqual(
span.attributes[SpanAttributes.DB_OPERATION], "endSessions"
)
self.assertEqual(
span.attributes[SpanAttributes.DB_NAME], MONGODB_DB_NAME
)

client.close()