Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,14 @@ def instrument_connection(
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
if isinstance(connection, psycopg.AsyncConnection):
connection.cursor_factory = _new_cursor_async_factory(
tracer_provider=tracer_provider
)
else:
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
connection._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use
return {"dbname": "test"}


class MockAsyncConnection:
class MockAsyncConnection(psycopg.AsyncConnection):
commit = mock.MagicMock(spec=types.MethodType)
commit.__name__ = "commit"

Expand Down Expand Up @@ -178,6 +178,8 @@ def test_instrumentor(self):

cnx = psycopg.connect(database="test")

self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))

cursor = cnx.cursor()

query = "SELECT * FROM test"
Expand Down Expand Up @@ -209,6 +211,8 @@ def test_instrumentor_with_connection_class(self):

cnx = psycopg.Connection.connect(database="test")

self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))

cursor = cnx.cursor()

query = "SELECT * FROM test"
Expand Down Expand Up @@ -239,6 +243,7 @@ def test_span_name(self):

cnx = psycopg.connect(database="test")

self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
cursor = cnx.cursor()

cursor.execute("Test query", ("param1Value", False))
Expand Down Expand Up @@ -267,6 +272,7 @@ def test_span_name(self):
def test_span_params_attribute(self):
PsycopgInstrumentor().instrument(capture_parameters=True)
cnx = psycopg.connect(database="test")
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s"
params = ("test", 42)

Expand Down Expand Up @@ -311,6 +317,7 @@ def test_custom_tracer_provider(self):
PsycopgInstrumentor().instrument(tracer_provider=tracer_provider)

cnx = psycopg.connect(database="test")
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
cursor = cnx.cursor()
query = "SELECT * FROM test"
cursor.execute(query)
Expand All @@ -332,6 +339,9 @@ def test_instrument_connection(self):
self.assertEqual(len(spans_list), 0)

cnx = PsycopgInstrumentor().instrument_connection(cnx)

self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))

cursor = cnx.cursor()
cursor.execute(query)

Expand All @@ -350,6 +360,7 @@ def test_instrument_connection_with_instrument(self):

PsycopgInstrumentor().instrument()
cnx = PsycopgInstrumentor().instrument_connection(cnx)
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
cursor = cnx.cursor()
cursor.execute(query)

Expand Down Expand Up @@ -422,6 +433,9 @@ async def test_wrap_async_connection_class_with_cursor(self):
async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect("test")
async with acnx as cnx:
self.assertTrue(
issubclass(cnx.cursor_factory, MockAsyncCursor)
)
async with cnx.cursor() as cursor:
await cursor.execute("SELECT * FROM test")

Expand Down Expand Up @@ -450,6 +464,9 @@ async def test_instrumentor_with_async_connection_class(self):
async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect("test")
async with acnx as cnx:
self.assertTrue(
issubclass(cnx.cursor_factory, MockAsyncCursor)
)
await cnx.execute("SELECT * FROM test")

await test_async_connection()
Expand All @@ -474,6 +491,7 @@ async def test_span_name_async(self):
PsycopgInstrumentor().instrument()

cnx = await psycopg.AsyncConnection.connect("test")
self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor))
async with cnx.cursor() as cursor:
await cursor.execute("Test query", ("param1Value", False))
await cursor.execute(
Expand All @@ -500,6 +518,7 @@ async def test_span_name_async(self):
async def test_span_params_attribute(self):
PsycopgInstrumentor().instrument(capture_parameters=True)
cnx = await psycopg.AsyncConnection.connect("test")
self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor))
query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s"
params = ("test", 42)
async with cnx.cursor() as cursor:
Expand Down Expand Up @@ -543,6 +562,7 @@ async def test_tracing_is_async(self):

async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect("test")
self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor))
async with acnx as cnx:
async with cnx.cursor() as cursor:
await cursor.execute("SELECT * FROM test", delay=delay)
Expand All @@ -557,3 +577,33 @@ async def test_async_connection():
self.assertGreater(duration, delay * 1e9)

PsycopgInstrumentor().uninstrument()

async def test_instrument_connection_uses_async_cursor_factory(self):
query = b"SELECT * FROM test"

acnx = await psycopg.AsyncConnection.connect("test")
async with acnx:
await acnx.execute(query)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)

acnx = PsycopgInstrumentor().instrument_connection(acnx)

self.assertTrue(acnx._is_instrumented_by_opentelemetry)

# The new cursor_factory should be a subclass of MockAsyncCursor,
# the async traced cursor factory returned by _new_cursor_async_factory
self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor))

cursor = acnx.cursor()
await cursor.execute(query)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationScope(
span, opentelemetry.instrumentation.psycopg
)