From d5d1e594117da4c30630ec8a36abf94257d747b1 Mon Sep 17 00:00:00 2001 From: Ashwin Krishnan Date: Mon, 23 Sep 2024 21:00:52 +0000 Subject: [PATCH] fixed event loops issue --- .../src/uns_graphql/backend/graphdb.py | 18 +- 07_uns_graphql/test/queries/test_graph.py | 579 +++++++++--------- 2 files changed, 301 insertions(+), 296 deletions(-) diff --git a/07_uns_graphql/src/uns_graphql/backend/graphdb.py b/07_uns_graphql/src/uns_graphql/backend/graphdb.py index e55536f5..f756853d 100644 --- a/07_uns_graphql/src/uns_graphql/backend/graphdb.py +++ b/07_uns_graphql/src/uns_graphql/backend/graphdb.py @@ -22,6 +22,7 @@ import logging from neo4j import AsyncDriver, AsyncGraphDatabase, Record +from neo4j.exceptions import Neo4jError from uns_graphql.graphql_config import GraphDBConfig @@ -58,15 +59,18 @@ async def get_graphdb_driver(cls, retry: int = 0) -> AsyncDriver: AsyncDriver: The Neo4j async driver. """ LOGGER.debug("GraphDB driver requested") - if cls._graphdb_driver is None: - LOGGER.info("Creating a new GraphDB driver") - cls._graphdb_driver = AsyncGraphDatabase.driver( - uri=GraphDBConfig.conn_url, auth=(GraphDBConfig.user, GraphDBConfig.password), database=GraphDBConfig.database - ) try: + if cls._graphdb_driver is None: + LOGGER.info("Creating a new GraphDB driver") + cls._graphdb_driver = AsyncGraphDatabase.driver( + uri=GraphDBConfig.conn_url, + auth=(GraphDBConfig.user, GraphDBConfig.password), + database=GraphDBConfig.database, + ) + await cls._graphdb_driver.verify_connectivity() LOGGER.debug("GraphDB driver connectivity verified") - except Exception as ex: + except Neo4jError as ex: LOGGER.error("Failed to verify GraphDB driver connectivity: %s", str(ex), stack_info=True, exc_info=True) # In case of connectivity failure, close the existing driver and create a new one await cls.release_graphdb_driver() @@ -91,7 +95,7 @@ async def release_graphdb_driver(cls): try: await cls._graphdb_driver.close() LOGGER.info("GraphDB driver closed successfully") - except Exception as ex: + except Neo4jError as ex: LOGGER.error("Error closing the GraphDB driver: %s", str(ex), stack_info=True, exc_info=True) finally: cls._graphdb_driver = None diff --git a/07_uns_graphql/test/queries/test_graph.py b/07_uns_graphql/test/queries/test_graph.py index 78e5130c..969a2611 100644 --- a/07_uns_graphql/test/queries/test_graph.py +++ b/07_uns_graphql/test/queries/test_graph.py @@ -593,292 +593,293 @@ async def test_strawberry_get_spb_nodes_by_metric(metric_names: list[str], has_r assert result.errors -@pytest.fixture(scope="module") -def my_event_loop(request): # noqa: ARG001 - """Create an instance of the default event loop for each test case.""" - loop = asyncio.get_event_loop_policy().new_event_loop() - asyncio.set_event_loop(loop) - yield loop - loop.close() - - -# @pytest_asyncio.fixture(scope="module") -# async def setup_graphdb_data(my_event_loop): # noqa: ARG001 -# """Fixture to set up data in the GraphDB from the test_data.cypher file.""" -# # Read the Cypher script as read only -# with open(file=Path(__file__).resolve().parent / "test_data.cypher") as file: -# cypher_script = file.read() -# # Filter out lines that are not valid Cypher queries -# valid_queries = cypher_script.replace(":begin\n", "").replace(":commit\n", "").split(";") -# # Execute each query separately -# driver = await GraphDB.get_graphdb_driver() -# async with driver.session() as session: -# for query in valid_queries: -# if query.strip(): # Make sure the query is not empty -# await session.run(query=query) - -# # Yield control to the test -# yield - -# # Teardown code i.e. clearing the database) -# async with driver.session() as session: -# await session.run("MATCH (n) DETACH DELETE n;") -# # Release the driver -# await GraphDB.release_graphdb_driver() - - -# @pytest.mark.asyncio -# @pytest.mark.integrationtest -# @pytest.mark.parametrize( -# "topics, expected_result", -# [ -# ( -# ["+", "test/uns/ar2/ln4"], -# [ -# UNSNode( -# namespace="test", -# node_name="test", -# node_type="ENTERPRISE", -# payload=JSONPayload(data={}), -# created=datetime.fromtimestamp(1486144500, UTC), -# last_updated=datetime.fromtimestamp(1486144500, UTC), -# ), -# UNSNode( -# namespace="test/uns/ar2/ln4", -# node_name="ln4", -# node_type="LINE", -# payload=JSONPayload( -# data={ -# "TestMetric2": "TestUNSwithNestedLists", -# "timestamp": 1486144500000, -# "dict_list": [{"a": "b"}, {"x": "y"}], -# } -# ), -# created=datetime.fromtimestamp(1486144500, UTC), -# last_updated=datetime.fromtimestamp(1486144500, UTC), -# ), -# ], -# ), -# ( -# ["test/uns/ar2/#"], -# [ -# UNSNode( -# namespace="test/uns/ar2/ln4", -# node_name="ln4", -# node_type="LINE", -# payload=JSONPayload( -# data={ -# "TestMetric2": "TestUNSwithNestedLists", -# "timestamp": 1486144500000, -# "dict_list": [{"a": "b"}, {"x": "y"}], -# } -# ), -# created=datetime.fromtimestamp(1486144500, UTC), -# last_updated=datetime.fromtimestamp(1486144500, UTC), -# ), -# UNSNode( -# namespace="test/uns/ar2/ln3", -# node_name="ln3", -# node_type="LINE", -# payload=JSONPayload( -# data={ -# "TestMetric2": "TestUNSwithLists", -# "list": [1, 2, 3, 4, 5], -# "timestamp": 1486144502144, -# } -# ), -# created=datetime.fromtimestamp(1486144502.144, UTC), -# last_updated=datetime.fromtimestamp(1486144502.144, UTC), -# ), -# ], -# ), -# ( -# ["test"], -# [ -# UNSNode( -# namespace="test", -# node_name="test", -# node_type="ENTERPRISE", -# payload=JSONPayload(data={}), -# created=datetime.fromtimestamp(1486144500, UTC), -# last_updated=datetime.fromtimestamp(1486144500, UTC), -# ) -# ], -# ), -# ], -# ) -# async def test_get_uns_nodes_integration( -# setup_graphdb_data, # noqa: ARG001 -# topics: list[str], -# expected_result: list[UNSNode], -# ): -# mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] -# graph_query = GraphQuery() -# try: -# result = await graph_query.get_uns_nodes(topics=mqtt_topic_list) -# except Exception as ex: -# pytest.fail(f"Should not throw any exceptions. Got {ex}") -# assert result == expected_result # Ensure the result matches the expected result - - -# @pytest.mark.asyncio -# @pytest.mark.integrationtest -# @pytest.mark.parametrize( -# "property_keys, topics, exclude_topics,expected_result", -# [ -# ( -# ["dict_list"], -# None, -# None, -# [ -# UNSNode( -# namespace="test/uns/ar2/ln4", -# node_name="ln4", -# node_type="LINE", -# payload=JSONPayload( -# data={ -# "TestMetric2": "TestUNSwithNestedLists", -# "timestamp": 1486144500000, -# "dict_list": [{"a": "b"}, {"x": "y"}], -# } -# ), -# created=datetime.fromtimestamp(1486144500, UTC), -# last_updated=datetime.fromtimestamp(1486144500, UTC), -# ), -# ], -# ), -# ( -# ["node_name"], -# ["test/uns/ar1"], -# False, -# [ -# UNSNode( -# namespace="test/uns/ar1", -# node_name="ar1", -# node_type="AREA", -# payload=JSONPayload(data={}), -# created=datetime.fromtimestamp(1486144502.122, UTC), -# last_updated=datetime.fromtimestamp(1486144502.122, UTC), -# ) -# ], -# ), -# ( -# ["TestMetric2"], -# ["test/uns/ar1/#"], -# False, -# [ -# UNSNode( -# namespace="test/uns/ar1/ln2", -# node_name="ln2", -# node_type="LINE", -# payload=JSONPayload(data={"TestMetric2": "TestUNS", "timestamp": 1486144502122}), -# created=datetime.fromtimestamp(1486144502.122, UTC), -# last_updated=datetime.fromtimestamp(1486144502.122, UTC), -# ) -# ], -# ), -# ( -# ["TestMetric2"], -# ["test/uns/ar2/#"], -# True, -# [ -# UNSNode( -# namespace="test/uns/ar1/ln2", -# node_name="ln2", -# node_type="LINE", -# payload=JSONPayload( -# data={ -# "TestMetric2": "TestUNS", -# "timestamp": 1486144502122, -# } -# ), -# created=datetime.fromtimestamp(1486144502.122, UTC), -# last_updated=datetime.fromtimestamp(1486144502.122, UTC), -# ) -# ], -# ), -# ], -# ) -# async def test_get_uns_nodes_by_property_integration( -# setup_graphdb_data, # noqa: ARG001 -# property_keys, -# topics: list[str], -# exclude_topics: bool, -# expected_result: dict, -# ): -# mqtt_topic_list = None -# if topics is not None: -# mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] - -# graph_query = GraphQuery() -# try: -# result = await graph_query.get_uns_nodes_by_property( -# property_keys=property_keys, topics=mqtt_topic_list, exclude_topics=exclude_topics -# ) -# except Exception as ex: -# pytest.fail(f"Should not throw any exceptions. Got {ex}") -# assert result == expected_result # Ensure the result matches the expected result - - -# eon1_payload = { -# "timestamp": 1671554024644, -# "metrics": [ -# {"name": "Inputs/A", "alias": 0, "timestamp": 1486144502122, "datatype": 11, "value": False}, -# {"name": "Inputs/B", "alias": 1, "timestamp": 1486144502122, "datatype": 11, "value": False}, -# {"name": "Outputs/E", "alias": 2, "timestamp": 1486144502122, "datatype": 11, "value": False}, -# {"name": "Outputs/F", "alias": 3, "timestamp": 1486144502122, "datatype": 11, "value": False}, -# {"name": "Properties/Hardware Make", "alias": 4, "timestamp": 1486144502122, "datatype": 12, "value": "Sony"}, -# {"name": "Properties/Weight", "alias": 5, "timestamp": 1486144502122, "datatype": 3, "value": 200}, -# ], -# "seq": 0, -# } - - -# @pytest.mark.asyncio -# @pytest.mark.integrationtest -# @pytest.mark.parametrize( -# "metric_names, expected_result", -# [ -# ( -# ["Inputs/A"], -# [ -# SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), -# SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), -# ], -# ), -# ( -# "Inputs/A", -# [ -# SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), -# SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), -# ], -# ), -# ( -# ["Inputs/A", "Inputs/B"], -# [ -# SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), -# SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), -# ], -# ), -# ( -# ["Outputs/F"], -# [ -# SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), -# SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), -# ], -# ), -# ([], []), -# (None, []), -# ], -# ) -# async def test_get_spb_nodes_integration( -# setup_graphdb_data, # noqa: ARG001 -# metric_names: list[str], -# expected_result: list[SPBNode], -# ): -# graph_query = GraphQuery() -# try: -# result = await graph_query.get_spb_nodes_by_metric(metric_names=metric_names) -# except Exception as ex: -# pytest.fail(f"Should not throw any exceptions. Got {ex}") -# assert result == expected_result +@pytest_asyncio.fixture(scope="module") +async def setup_graphdb_data(): + current_loop = asyncio.get_event_loop() + """Fixture to set up data in the GraphDB from the test_data.cypher file.""" + # Read the Cypher script as read only + with open(file=Path(__file__).resolve().parent / "test_data.cypher") as file: + cypher_script = file.read() + # Filter out lines that are not valid Cypher queries + valid_queries = cypher_script.replace(":begin\n", "").replace(":commit\n", "").split(";") + # Execute each query separately + driver = await GraphDB.get_graphdb_driver() + async with driver.session() as session: + for query in valid_queries: + if query.strip(): # Make sure the query is not empty + await session.run(query=query) + + # Yield control to the test + yield current_loop + + # Teardown code i.e. clearing the database) + async with driver.session() as session: + await session.run("MATCH (n) DETACH DELETE n;") + # Release the driver + await GraphDB.release_graphdb_driver() + + +@pytest.mark.asyncio(scope="module", loop_scope="module") +@pytest.mark.integrationtest +# FIXME not working with VsCode https://github.com/microsoft/vscode-python/issues/19374 +# Comment this marker and run test individually in VSCode. Uncomment for running from command line / CI +@pytest.mark.xdist_group(name="graphql_graphdb") +@pytest.mark.parametrize( + "topics, expected_result", + [ + ( + ["+", "test/uns/ar2/ln4"], + [ + UNSNode( + namespace="test", + node_name="test", + node_type="ENTERPRISE", + payload=JSONPayload(data={}), + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ), + UNSNode( + namespace="test/uns/ar2/ln4", + node_name="ln4", + node_type="LINE", + payload=JSONPayload( + data={ + "TestMetric2": "TestUNSwithNestedLists", + "timestamp": 1486144500000, + "dict_list": [{"a": "b"}, {"x": "y"}], + } + ), + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ), + ], + ), + ( + ["test/uns/ar2/#"], + [ + UNSNode( + namespace="test/uns/ar2/ln3", + node_name="ln3", + node_type="LINE", + payload=JSONPayload( + data={ + "TestMetric2": "TestUNSwithLists", + "list": [1, 2, 3, 4, 5], + "timestamp": 1486144502144, + } + ), + created=datetime.fromtimestamp(1486144502.144, UTC), + last_updated=datetime.fromtimestamp(1486144502.144, UTC), + ), + UNSNode( + namespace="test/uns/ar2/ln4", + node_name="ln4", + node_type="LINE", + payload=JSONPayload( + data={ + "TestMetric2": "TestUNSwithNestedLists", + "timestamp": 1486144500000, + "dict_list": [{"a": "b"}, {"x": "y"}], + } + ), + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ), + ], + ), + ( + ["test"], + [ + UNSNode( + namespace="test", + node_name="test", + node_type="ENTERPRISE", + payload=JSONPayload(data={}), + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ) + ], + ), + ], +) +async def test_get_uns_nodes_integration( + setup_graphdb_data, # noqa: ARG001 + topics: list[str], + expected_result: list[UNSNode], +): + mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] + graph_query = GraphQuery() + try: + result = await graph_query.get_uns_nodes(topics=mqtt_topic_list) + except Exception as ex: + pytest.fail(f"Should not throw any exceptions. Got {ex}") + assert result == expected_result # Ensure the result matches the expected result + + +@pytest.mark.asyncio(scope="module", loop_scope="module") +@pytest.mark.integrationtest +# FIXME not working with VsCode https://github.com/microsoft/vscode-python/issues/19374 +# Comment this marker and run test individually in VSCode. Uncomment for running from command line / CI +@pytest.mark.xdist_group(name="graphql_graphdb") +@pytest.mark.parametrize( + "property_keys, topics, exclude_topics,expected_result", + [ + ( + ["dict_list"], + None, + None, + [ + UNSNode( + namespace="test/uns/ar2/ln4", + node_name="ln4", + node_type="LINE", + payload=JSONPayload( + data={ + "TestMetric2": "TestUNSwithNestedLists", + "timestamp": 1486144500000, + "dict_list": [{"a": "b"}, {"x": "y"}], + } + ), + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ), + ], + ), + ( + ["node_name"], + ["test/uns/ar1"], + False, + [ + UNSNode( + namespace="test/uns/ar1", + node_name="ar1", + node_type="AREA", + payload=JSONPayload(data={}), + created=datetime.fromtimestamp(1486144502.122, UTC), + last_updated=datetime.fromtimestamp(1486144502.122, UTC), + ) + ], + ), + ( + ["TestMetric2"], + ["test/uns/ar1/#"], + False, + [ + UNSNode( + namespace="test/uns/ar1/ln2", + node_name="ln2", + node_type="LINE", + payload=JSONPayload(data={"TestMetric2": "TestUNS", "timestamp": 1486144502122}), + created=datetime.fromtimestamp(1486144502.122, UTC), + last_updated=datetime.fromtimestamp(1486144502.122, UTC), + ) + ], + ), + ( + ["TestMetric2"], + ["test/uns/ar2/#"], + True, + [ + UNSNode( + namespace="test/uns/ar1/ln2", + node_name="ln2", + node_type="LINE", + payload=JSONPayload( + data={ + "TestMetric2": "TestUNS", + "timestamp": 1486144502122, + } + ), + created=datetime.fromtimestamp(1486144502.122, UTC), + last_updated=datetime.fromtimestamp(1486144502.122, UTC), + ) + ], + ), + ], +) +async def test_get_uns_nodes_by_property_integration( + setup_graphdb_data, # noqa: ARG001 + property_keys, + topics: list[str], + exclude_topics: bool, + expected_result: dict, +): + mqtt_topic_list = None + if topics is not None: + mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] + + graph_query = GraphQuery() + try: + result = await graph_query.get_uns_nodes_by_property( + property_keys=property_keys, topics=mqtt_topic_list, exclude_topics=exclude_topics + ) + except Exception as ex: + pytest.fail(f"Should not throw any exceptions. Got {ex}") + assert result == expected_result # Ensure the result matches the expected result + + +eon1_payload = { + "timestamp": 1671554024644, + "metrics": [ + {"name": "Inputs/A", "alias": 0, "timestamp": 1486144502122, "datatype": 11, "value": False}, + {"name": "Inputs/B", "alias": 1, "timestamp": 1486144502122, "datatype": 11, "value": False}, + {"name": "Outputs/E", "alias": 2, "timestamp": 1486144502122, "datatype": 11, "value": False}, + {"name": "Outputs/F", "alias": 3, "timestamp": 1486144502122, "datatype": 11, "value": False}, + {"name": "Properties/Hardware Make", "alias": 4, "timestamp": 1486144502122, "datatype": 12, "value": "Sony"}, + {"name": "Properties/Weight", "alias": 5, "timestamp": 1486144502122, "datatype": 3, "value": 200}, + ], + "seq": 0, +} + + +@pytest.mark.asyncio(scope="module", loop_scope="module") +@pytest.mark.integrationtest +# FIXME not working with VsCode https://github.com/microsoft/vscode-python/issues/19374 +# Comment this marker and run test individually in VSCode. Uncomment for running from command line / CI +@pytest.mark.xdist_group(name="graphql_graphdb") +@pytest.mark.parametrize( + "metric_names, expected_result", + [ + ( + ["Inputs/A"], + [ + SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), + SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), + ], + ), + ( + "Inputs/A", + [ + SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), + SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), + ], + ), + ( + ["Inputs/A", "Inputs/B"], + [ + SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), + SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), + ], + ), + ( + ["Outputs/F"], + [ + SPBNode(topic="spBv1.0/uns_group/NBIRTH/eon1", payload=eon1_payload), + SPBNode(topic="spBv1.0/uns_group/NDATA/eon1", payload=eon1_payload), + ], + ), + ([], []), + (None, []), + ], +) +async def test_get_spb_nodes_integration( + setup_graphdb_data, # noqa: ARG001 + metric_names: list[str], + expected_result: list[SPBNode], +): + graph_query = GraphQuery() + try: + result = await graph_query.get_spb_nodes_by_metric(metric_names=metric_names) + except Exception as ex: + pytest.fail(f"Should not throw any exceptions. Got {ex}") + assert result == expected_result