diff --git a/CHANGELOG.md b/CHANGELOG.md index a47b74c30..5f21229d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,33 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed + +## [0.4.18] - 2023-07-27 +### Added +- Added `SQLServerToParquet` flow. +- Added `SAPBW` source class. +- Added `SAPBWToDF` task class. +- Added `SAPBWToADLS` flow class. +- Added a new `end_point` parameter in `genesys_api_connection` to make it more generic. +- Added `VidClubToADLS` flow class. + +### Fixed +- Fixed a bug in `subject` (extra separator) and in `receivers` (long strings) parameters in `Outlook` connector. +- Fixed issue with credentials handling in `VidClub` source class. +- Fixed issue with missing arguments in `VidClubToDF` task class. + +### Changed +- Genesys API call method and the name changed from `genesys_generate_exports` to `genesys_api_connection`. +- Added `GET` connection inside the method `genesys_api_connection`. +- Added new parameters in the `GenesysToCSV` task to be able to extract `web message` files. +- Changed looping structure for API calls in `VidClub` source class to use time intervals. +- Changed `VidClubToDF` task class to use total_load function from source. + +### Removed +- Removed methods never used in production: `get_analitics_url_report`, `get_all_schedules_job`, `schedule_report`, +`to_df`, `delete_scheduled_report_job` and `generate_reporting_export`. + + ## [0.4.17] - 2023-06-15 ### Fixed - Fixed issue with `tzlocal` for O365 package diff --git a/tests/integration/flows/test_sap_bw_to_adls.py b/tests/integration/flows/test_sap_bw_to_adls.py new file mode 100644 index 000000000..c337336de --- /dev/null +++ b/tests/integration/flows/test_sap_bw_to_adls.py @@ -0,0 +1,53 @@ +import os +from unittest import mock + +import pandas as pd +import pytest + +from viadot.flows import SAPBWToADLS + +DATA = { + "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], + "date": ["2023-06-19 11:12:43+00:00"], +} + +ADLS_FILE_NAME = "test_sap_bw_to_adls.parquet" +ADLS_DIR_PATH = "raw/tests/" + + +@mock.patch( + "viadot.tasks.SAPBWToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_sap_bw_to_adls_flow_run(mocked_class): + flow = SAPBWToADLS( + "test_sap_bw_to_adls_flow_run", + sapbw_credentials_key="SAP", + env="BW", + mdx_query=""" + SELECT + { + } + ON COLUMNS, + NON EMPTY + { + { [0CALMONTH].[202301] } + } + DIMENSION PROPERTIES + DESCRIPTION, + MEMBER_NAME + ON ROWS + + FROM ZCSALORD1/ZBW4_ZCSALORD1_006_BOA + + """, + mapping_dict={"[0CALMONTH].[LEVEL01].[DESCRIPTION]": "Calendar Year/Month"}, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_sap_bw_to_adls_flow_run.parquet") + os.remove("test_sap_bw_to_adls_flow_run.json") diff --git a/tests/integration/flows/test_sql_server_to_parquet.py b/tests/integration/flows/test_sql_server_to_parquet.py new file mode 100644 index 000000000..9be9c34ed --- /dev/null +++ b/tests/integration/flows/test_sql_server_to_parquet.py @@ -0,0 +1,40 @@ +import os + +import pytest +from prefect import Flow + +from viadot.flows import SQLServerToParquet +from viadot.tasks import SQLServerToDF +from viadot.tasks.sql_server import SQLServerQuery + +SCHEMA = "sandbox" +TABLE = "test" +PATH = "test.parquet" + + +@pytest.fixture(scope="session") +def create_table(): + query_task = SQLServerQuery("AZURE_SQL") + query_task.run(f"DROP TABLE IF EXISTS {SCHEMA}.{TABLE}") + query_task.run(f"CREATE TABLE {SCHEMA}.{TABLE} (Id INT, Name VARCHAR (10))") + yield True + + +def test_sql_server_to_parquet_flow(create_table): + flow = SQLServerToParquet( + name="test_flow", + sql_query=f"SELECT * FROM {SCHEMA}.{TABLE}", + local_file_path=PATH, + if_exists="fail", + sqlserver_config_key="AZURE_SQL", + timeout=3600, + ) + flow.gen_flow() + assert isinstance(flow, Flow) + assert len(flow.tasks) == 3 # Number of tasks in the flow + tasks = list(flow.tasks) + + assert isinstance(tasks[0], SQLServerToDF) + flow.run() + assert os.path.isfile(PATH) == True + os.remove(PATH) diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py new file mode 100644 index 000000000..a0c86c2ec --- /dev/null +++ b/tests/integration/flows/test_vidclub_to_adls.py @@ -0,0 +1,31 @@ +import os +from unittest import mock + +import pandas as pd +import pytest + +from viadot.flows import VidClubToADLS + +DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} +ADLS_FILE_NAME = "test_vid_club.parquet" +ADLS_DIR_PATH = "raw/test/" + + +@mock.patch( + "viadot.tasks.VidClubToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_vidclub_to_adls_run_flow(mocked_class): + flow = VidClubToADLS( + "test_vidclub_to_adls_flow_run", + source=["test"], + from_date="2023-06-05", + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_vidclub_to_adls_flow_run.parquet") + os.remove("test_vidclub_to_adls_flow_run.json") diff --git a/tests/integration/tasks/test_bigquery.py b/tests/integration/tasks/test_bigquery.py index 963284e60..3cf184d85 100644 --- a/tests/integration/tasks/test_bigquery.py +++ b/tests/integration/tasks/test_bigquery.py @@ -17,10 +17,10 @@ def test_bigquery_to_df_success(): credentials_key=CREDENTIALS_KEY, ) df = bigquery_to_df_task.run() - expectation_columns = ["date", "name", "count", "refresh"] + expected_column = ["my_value"] assert isinstance(df, pd.DataFrame) - assert expectation_columns == list(df.columns) + assert expected_column == list(df.columns) def test_bigquery_to_df_wrong_table_name(caplog): @@ -46,7 +46,7 @@ def test_bigquery_to_df_wrong_column_name(caplog): with caplog.at_level(logging.WARNING): df = bigquery_to_df_task.run() assert f"'wrong_column_name' column is not recognized." in caplog.text - assert df.empty + assert isinstance(df, pd.DataFrame) def test_bigquery_to_df_wrong_query(caplog): diff --git a/tests/integration/tasks/test_genesys_task.py b/tests/integration/tasks/test_genesys_task.py index 1d97610a8..eb4978fa6 100644 --- a/tests/integration/tasks/test_genesys_task.py +++ b/tests/integration/tasks/test_genesys_task.py @@ -12,6 +12,20 @@ def var_dictionary() -> None: variables = { "start_date": datetime.now().strftime("%Y-%m-%d"), + "end_date": datetime.now().strftime("%Y-%m-%d"), + "v_list": [ + "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx1", + "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx2", + "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx3", + ], + "key_list": [ + "MainIntent", + "SubIntent", + "Final Sub Intent", + "CustomerOutcomeTrack", + "LastUtterance", + "Final Main Intent", + ], "post_data_list": [ { "name": "AGENT_STATUS_DETAIL_VIEW", @@ -42,146 +56,195 @@ def var_dictionary() -> None: class MockGenesysTask: - report_data = [[None, "COMPLETED"], [None, "COMPLETED"]] - - def genesys_generate_exports(post_data_list, end_point): - report = { - "conversations": [ - { - "conversationEnd": "2020-01-01T00:00:00.00Z", - "conversationId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "conversationStart": "2020-01-01T00:00:00.00Z", - "divisionIds": [ - "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - ], - "mediaStatsMinConversationMos": 4.379712366260067, - "mediaStatsMinConversationRFactor": 79.03050231933594, - "originatingDirection": "inbound", - "participants": [ - { - "externalContactId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "participantId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "participantName": "Mobile Number, Country", - "purpose": "customer", - "sessions": [ - { - "agentBullseyeRing": 1, - "ani": "tel:+xxxxxxxxxxx", - "direction": "inbound", - "dnis": "tel:+xxxxxxxxxxx", - "edgeId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "mediaType": "voice", - "protocolCallId": "xxxxxxxxxxxxxxxxxxx@xx.xxx.xxx.xxx", - "provider": "Edge", - "remoteNameDisplayable": "Mobile Number, Country", - "requestedRoutings": ["Standard"], - "routingRing": 1, - "selectedAgentId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "sessionDnis": "tel:+xxxxxxxxxxx", - "sessionId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "usedRouting": "Standard", - "mediaEndpointStats": [ - { - "codecs": ["audio/opus"], - "eventTime": "2020-01-01T00:00:00.00Z", - "maxLatencyMs": 30, - "minMos": 4.882504366160681, - "minRFactor": 92.44775390625, - "receivedPackets": 229, - }, - ], - "metrics": [ - { - "emitDate": "2020-01-01T00:00:00.00Z", - "name": "nConnected", - "value": 1, - }, - ], - "segments": [ - { - "conference": False, - "segmentEnd": "2020-01-01T00:00:00.00Z", - "segmentStart": "2020-01-01T00:00:00.00Z", - "segmentType": "system", - }, - { - "conference": False, - "disconnectType": "peer", - "queueId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "segmentEnd": "2020-01-01T00:00:00.00Z", - "segmentStart": "2020-01-01T00:00:00.00Z", - "segmentType": "interact", - }, - ], - } - ], + report_data = ([[None, "COMPLETED"], [None, "COMPLETED"]],) + + def genesys_api_connection(post_data_list, end_point, method="POST"): + if method == "GET": + report = { + "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "startTime": "2023-06-28T10:59:48.194Z", + "participants": [ + { + "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "startTime": "2023-06-28T10:59:48.194Z", + "connectedTime": "2023-06-28T10:59:48.194Z", + "externalContactId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "queueId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "queueName": "dev_name", + "purpose": "customer", + "wrapupRequired": None, + "mediaRoles": ["full"], + "attributes": { + "MainIntent3": "mainintent3", + "SubIntent3": "subintent3", + "SubIntent2": "subintent2", + "MainIntent4": "mainintent4", + "MainIntent1": "mainintent1", + "SubIntent1": "subintent1", + "MainIntent2": "mainintent2", + "Final Sub Intent": "finalsubintent", + "SubIntent4": "subintent4", + "CustomerOutcomeTrack4": "customeroutcome4", + "CustomerOutcomeTrack3": "customeroutcome3", + "CustomerOutcomeTrack2": "customeroutcome2", + "CustomerOutcomeTrack1": "customeroutcome1", + "LastUtterance2": "lastutterance2", + "reached": "reach", + "LastUtterance1": "lastutterance1", + "name": "dev_name", + "Final Main Intent": "finalmainintent", + "LastUtterance4": "lastutterance4", + "LastUtterance3": "lastutterance3", + "LOB": "lob", + "memberId": "123456789", }, - { - "participantId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "participantName": "xxxxxxxxxxxxxxxxxxxxx", - "purpose": "ivr", - "sessions": [ - { - "ani": "tel:+xxxxxxxxxxx", - "direction": "inbound", - "dnis": "tel:+xxxxxxxxxxx", - "edgeId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "mediaType": "voice", - "peerId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "protocolCallId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "provider": "Edge", - "remote": "Mobile Number, Country", - "remoteNameDisplayable": "xxxxxxxx, Country", - "sessionDnis": "tel:+xxxxxxxxxxx", - "sessionId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "mediaEndpointStats": [ - { - "codecs": ["audio/opus"], - "eventTime": "2020-01-01T00:00:00.00Z", - "maxLatencyMs": 30, - "minMos": 4.429814389713434, - "minRFactor": 79.03050231933594, - "receivedPackets": 229, - } - ], - "flow": { - "endingLanguage": "lt-lt", - "entryReason": "tel:+xxxxxxxxxxx", - "entryType": "dnis", - "exitReason": "TRANSFER", - "flowId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "flowName": "xxxxxxxxxxxxxxxxxxxxx", - "flowType": "INBOUNDCALL", - "flowVersion": "22.0", - "startingLanguage": "en-us", - "transferTargetAddress": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", - "transferTargetName": "xxxxxxxxxxxxxxxxxxxxx", - "transferType": "ACD", - }, - "metrics": [ - { - "emitDate": "2020-01-01T00:00:00.00Z", - "name": "nFlow", - "value": 1, - }, - ], - "segments": [ - { - "conference": False, - "segmentEnd": "2020-01-01T00:00:00.00Z", - "segmentStart": "2020-01-01T00:00:00.00Z", - "segmentType": "system", + "calls": [], + "callbacks": [], + "chats": [], + "cobrowsesessions": [], + "emails": [], + "messages": [], + } + ], + } + else: + report = { + "conversations": [ + { + "conversationEnd": "2020-01-01T00:00:00.00Z", + "conversationId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "conversationStart": "2020-01-01T00:00:00.00Z", + "divisionIds": [ + "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + ], + "mediaStatsMinConversationMos": 4.379712366260067, + "mediaStatsMinConversationRFactor": 79.03050231933594, + "originatingDirection": "inbound", + "participants": [ + { + "externalContactId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "participantId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "participantName": "Mobile Number, Country", + "purpose": "customer", + "sessions": [ + { + "agentBullseyeRing": 1, + "ani": "tel:+xxxxxxxxxxx", + "direction": "inbound", + "dnis": "tel:+xxxxxxxxxxx", + "edgeId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "mediaType": "voice", + "protocolCallId": "xxxxxxxxxxxxxxxxxxx@xx.xxx.xxx.xxx", + "provider": "Edge", + "remoteNameDisplayable": "Mobile Number, Country", + "requestedRoutings": ["Standard"], + "routingRing": 1, + "selectedAgentId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "sessionDnis": "tel:+xxxxxxxxxxx", + "sessionId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "usedRouting": "Standard", + "mediaEndpointStats": [ + { + "codecs": ["audio/opus"], + "eventTime": "2020-01-01T00:00:00.00Z", + "maxLatencyMs": 30, + "minMos": 4.882504366160681, + "minRFactor": 92.44775390625, + "receivedPackets": 229, + }, + ], + "metrics": [ + { + "emitDate": "2020-01-01T00:00:00.00Z", + "name": "nConnected", + "value": 1, + }, + ], + "segments": [ + { + "conference": False, + "segmentEnd": "2020-01-01T00:00:00.00Z", + "segmentStart": "2020-01-01T00:00:00.00Z", + "segmentType": "system", + }, + { + "conference": False, + "disconnectType": "peer", + "queueId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "segmentEnd": "2020-01-01T00:00:00.00Z", + "segmentStart": "2020-01-01T00:00:00.00Z", + "segmentType": "interact", + }, + ], + } + ], + }, + { + "participantId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "participantName": "xxxxxxxxxxxxxxxxxxxxx", + "purpose": "ivr", + "sessions": [ + { + "ani": "tel:+xxxxxxxxxxx", + "direction": "inbound", + "dnis": "tel:+xxxxxxxxxxx", + "edgeId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "mediaType": "voice", + "peerId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "protocolCallId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "provider": "Edge", + "remote": "Mobile Number, Country", + "remoteNameDisplayable": "xxxxxxxx, Country", + "sessionDnis": "tel:+xxxxxxxxxxx", + "sessionId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "mediaEndpointStats": [ + { + "codecs": ["audio/opus"], + "eventTime": "2020-01-01T00:00:00.00Z", + "maxLatencyMs": 30, + "minMos": 4.429814389713434, + "minRFactor": 79.03050231933594, + "receivedPackets": 229, + } + ], + "flow": { + "endingLanguage": "lt-lt", + "entryReason": "tel:+xxxxxxxxxxx", + "entryType": "dnis", + "exitReason": "TRANSFER", + "flowId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "flowName": "xxxxxxxxxxxxxxxxxxxxx", + "flowType": "INBOUNDCALL", + "flowVersion": "22.0", + "startingLanguage": "en-us", + "transferTargetAddress": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", + "transferTargetName": "xxxxxxxxxxxxxxxxxxxxx", + "transferType": "ACD", }, - ], - } - ], - }, - ], - } - ], - "totalHits": 100, - } + "metrics": [ + { + "emitDate": "2020-01-01T00:00:00.00Z", + "name": "nFlow", + "value": 1, + }, + ], + "segments": [ + { + "conference": False, + "segmentEnd": "2020-01-01T00:00:00.00Z", + "segmentStart": "2020-01-01T00:00:00.00Z", + "segmentType": "system", + }, + ], + } + ], + }, + ], + } + ], + "totalHits": 100, + } return report def get_reporting_exports_data(): @@ -232,7 +295,7 @@ def test_genesys_conversations(mock_genesys, var_dictionary): to_csv = GenesysToCSV() file_name = to_csv.run( view_type=None, - end_point="conversations/details/query", + end_point="analytics/conversations/details/query", post_data_list=var_dictionary["post_data_list_2"], start_date=var_dictionary["start_date"], ) @@ -240,3 +303,24 @@ def test_genesys_conversations(mock_genesys, var_dictionary): mock_genesys.assert_called_once() assert file_name[0] == f"conversations_detail_{date}".upper() + ".csv" + + +@mock.patch("viadot.tasks.genesys.Genesys", return_value=MockGenesysTask) +@pytest.mark.conv +def test_genesys_webmsg(mock_genesys, var_dictionary): + to_csv = GenesysToCSV() + file_name = to_csv.run( + view_type=None, + end_point="conversations", + conversationId_list=var_dictionary["v_list"], + post_data_list=[""], + key_list=var_dictionary["key_list"], + start_date=var_dictionary["start_date"], + end_date=var_dictionary["end_date"], + ) + + start = var_dictionary["start_date"].replace("-", "") + end = var_dictionary["end_date"].replace("-", "") + + mock_genesys.assert_called_once() + assert file_name[0] == f"WEBMESSAGE_{start}-{end}.csv" diff --git a/tests/integration/tasks/test_sap_bw.py b/tests/integration/tasks/test_sap_bw.py new file mode 100644 index 000000000..6aea385aa --- /dev/null +++ b/tests/integration/tasks/test_sap_bw.py @@ -0,0 +1,121 @@ +import pandas as pd +import pytest + +from viadot.task_utils import credentials_loader +from viadot.tasks import SAPBWToDF + +CREDENTIALS = credentials_loader.run(credentials_secret="SAP") +sapbw_task = SAPBWToDF(sapbw_credentials=CREDENTIALS.get("BW")) + + +@pytest.fixture(scope="session") +def output_variable(): + output = { + "RETURN": { + "TYPE": "", + "ID": "", + "NUMBER": "000", + "MESSAGE": "", + "LOG_NO": "", + "LOG_MSG_NO": "000000", + "MESSAGE_V1": "", + "MESSAGE_V2": "", + "MESSAGE_V3": "", + "MESSAGE_V4": "", + "PARAMETER": "", + "ROW": 0, + "FIELD": "", + "SYSTEM": "", + }, + "STATISTIC": {"STEP": "003YPR44RQTVS3BSMZTKDYBMD"}, + "DATA": [ + { + "COLUMN": 0, + "ROW": 0, + "DATA": "January 2023", + "VALUE_DATA_TYPE": "CHAR", + "CELL_STATUS": "", + }, + { + "COLUMN": 1, + "ROW": 0, + "DATA": "202301", + "VALUE_DATA_TYPE": "NUMC", + "CELL_STATUS": "", + }, + ], + "HEADER": [ + { + "COLUMN": 0, + "ROW": 0, + "DATA": "[0CALMONTH].[LEVEL01].[DESCRIPTION]", + "VALUE_DATA_TYPE": "CHAR", + "CELL_STATUS": "", + }, + { + "COLUMN": 1, + "ROW": 0, + "DATA": "[0CALMONTH].[LEVEL01].[MEMBER_NAME]", + "VALUE_DATA_TYPE": "CHAR", + "CELL_STATUS": "", + }, + ], + } + yield output + + +@pytest.fixture(scope="session") +def user_mapping(): + mapping = { + "[0CALMONTH].[LEVEL01].[DESCRIPTION]": "Calendar Year/Month", + "[0CALMONTH].[LEVEL01].[MEMBER_NAME]": "Calendar Year/Month key", + } + yield mapping + + +@pytest.fixture(scope="session") +def mdx_query_variable(): + mdx_query = """ + SELECT + { + } + ON COLUMNS, + NON EMPTY + { + { [0CALMONTH].[202301] } + } + DIMENSION PROPERTIES + DESCRIPTION, + MEMBER_NAME + ON ROWS + + FROM ZCSALORD1/ZBW4_ZCSALORD1_006_BOA + + """ + yield mdx_query + + +df_to_test = pd.DataFrame( + data={ + "[0CALMONTH].[LEVEL01].[DESCRIPTION]": ["January 2023"], + "[0CALMONTH].[LEVEL01].[MEMBER_NAME]": ["202301"], + "date": ["2023-06-19 11:12:43+00:00"], + }, +) + + +def test_apply_user_mapping(user_mapping): + apply_mapping = sapbw_task.apply_user_mapping(df_to_test, user_mapping) + print(user_mapping.values()) + assert list(apply_mapping.columns) == list(user_mapping.values()) + assert isinstance(apply_mapping, pd.DataFrame) + + +def test_to_df(output_variable): + df = sapbw_task.to_df(output_variable) + assert isinstance(df, pd.DataFrame) + + +def test_run(mdx_query_variable, user_mapping): + df = sapbw_task.run(mdx_query_variable, user_mapping) + assert isinstance(df, pd.DataFrame) diff --git a/tests/integration/tasks/test_vid_club.py b/tests/integration/tasks/test_vid_club.py index 575e2051e..8fad7fdde 100644 --- a/tests/integration/tasks/test_vid_club.py +++ b/tests/integration/tasks/test_vid_club.py @@ -3,8 +3,11 @@ import pandas as pd import pytest +from viadot.task_utils import credentials_loader from viadot.tasks import VidClubToDF +CREDENTIALS = credentials_loader.run(credentials_secret="VIDCLUB") + class MockVidClubResponse: response_data = pd.DataFrame() @@ -12,7 +15,13 @@ class MockVidClubResponse: @pytest.fixture(scope="session") def var_dictionary(): - variables = {"source": "jobs", "from_date": "2022-03-23", "to_date": "2022-03-24"} + variables = { + "source": "jobs", + "from_date": "2022-03-23", + "to_date": "2022-03-24", + "items_per_page": 1, + "days_interval": 1, + } yield variables @@ -20,15 +29,92 @@ def var_dictionary(): "viadot.tasks.VidClubToDF.run", return_value=MockVidClubResponse.response_data ) def test_vid_club_to_df(var_dictionary): - source = var_dictionary["source"] - from_date = var_dictionary["from_date"] - to_date = var_dictionary["to_date"] - - vc_to_df = VidClubToDF( - source=source, - to_date=to_date, - from_date=from_date, + """ + Checks if run method returns DataFrame. + + Args: + var_dictionary: Dictionary with example arguments for run method. + """ + vc_to_df = VidClubToDF(credentials=CREDENTIALS) + + df = vc_to_df.run( + source=var_dictionary["source"], + to_date=var_dictionary["to_date"], + from_date=var_dictionary["from_date"], + items_per_page=var_dictionary["items_per_page"], + days_interval=var_dictionary["days_interval"], ) - df = vc_to_df.run() assert isinstance(df, pd.DataFrame) + + +@pytest.mark.drop_cols +def test_drop_columns(var_dictionary): + """ + Tests cols_to_drop argument in function. + + Args: + var_dictionary: Dictionary with example arguments for run method. + """ + cols_to_drop = ["regionID", "submissionDate"] + vc_to_df = VidClubToDF(credentials=CREDENTIALS) + + output_with_dropped = vc_to_df.run( + source=var_dictionary["source"], + to_date=var_dictionary["to_date"], + from_date=var_dictionary["from_date"], + items_per_page=var_dictionary["items_per_page"], + days_interval=var_dictionary["days_interval"], + cols_to_drop=cols_to_drop, + ) + + assert all(col not in output_with_dropped.columns for col in cols_to_drop) + + +@pytest.mark.drop_cols +def test_drop_columns_KeyError(var_dictionary, caplog): + """ + Tests if in case of KeyError (when passed columns in cols_to_drop are not included in DataFrame), there is returned error logger.. + + Args: + var_dictionary: Dictionary with example arguments for run method. + """ + cols_to_drop = ["Test", "submissionDate"] + vc_to_df = VidClubToDF(credentials=CREDENTIALS) + + vc_to_df.run( + source=var_dictionary["source"], + to_date=var_dictionary["to_date"], + from_date=var_dictionary["from_date"], + items_per_page=var_dictionary["items_per_page"], + days_interval=var_dictionary["days_interval"], + cols_to_drop=cols_to_drop, + ) + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "ERROR" + assert ( + f"Column(s): {cols_to_drop} don't exist in the DataFrame" + in caplog.records[0].message + ) + + +@pytest.mark.drop_cols +def test_drop_columns_TypeError(var_dictionary): + """ + Tests raising TypeError if passed columns in cols_to_drop is not a List. + + Args: + var_dictionary: Dictionary with example arguments for run method. + """ + with pytest.raises(TypeError, match="Provide columns to drop in a List."): + cols_to_drop = "Test" + vc_to_df = VidClubToDF(credentials=CREDENTIALS) + + output_with_dropped = vc_to_df.run( + source=var_dictionary["source"], + to_date=var_dictionary["to_date"], + from_date=var_dictionary["from_date"], + items_per_page=var_dictionary["items_per_page"], + days_interval=var_dictionary["days_interval"], + cols_to_drop=cols_to_drop, + ) diff --git a/tests/integration/test_bigquery.py b/tests/integration/test_bigquery.py index 2efecce52..f60092346 100644 --- a/tests/integration/test_bigquery.py +++ b/tests/integration/test_bigquery.py @@ -1,48 +1,143 @@ +from typing import List + import pandas as pd +import pandas_gbq +import pytest +from numpy import ndarray +from viadot.exceptions import CredentialError, DBDataAccessError from viadot.sources import BigQuery -BIGQ = BigQuery(credentials_key="BIGQUERY_TESTS") +@pytest.fixture(scope="function") +def BIGQ(): + """ + Fixture for creating a BigQuery class instance. This fixture initializes a BigQuery client + using the provided credentials key and yields the class instance. + The class instance can be used within a test function to interact with BigQuery. + + Yields: + BigQuery: A BigQuery client instance. + """ + BQ = BigQuery(credentials_key="BIGQUERY_TESTS") + yield BQ + + +@pytest.fixture(scope="session") +def insert_into_tables() -> None: + """ + A function to insert data into a BigQuery table. In the current version, tables are deleted + after 60 days. This operation is used to secure tests and structure in a BigQuery project. + """ + table_id1 = "manigeo.manigeo_tab" + table_id2 = "manigeo.space" + df = pd.DataFrame({"my_value": ["val1", "val2", "val3"]}) + pandas_gbq.to_gbq(df, table_id1, if_exists="replace") + pandas_gbq.to_gbq(df, table_id2, if_exists="replace") + + +# SQL query for public dataset - user with access to Bigquery can also use public datasets and tables. +QUERY = """ + SELECT name, SUM(number) AS total + FROM `bigquery-public-data.usa_names.usa_1910_2013` + GROUP BY name, gender + ORDER BY total DESC + LIMIT 4 + """ + + +def test_credentials(): + """Test to see if an exception is thrown if credentials are not provided.""" + with pytest.raises(CredentialError, match=r"Credentials not found."): + BigQuery(credentials_key="BIGQUERY_TESTS_FAKE") -def test_list_project(): + +def test_list_project(BIGQ): + """ + Testing the correctness of the project name. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ project = BIGQ.get_project_id() + assert project == "manifest-geode-341308" -def test_list_datasets(): +def test_list_datasets(BIGQ): + """ + Testing the correctness of dataset names. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ datasets = list(BIGQ.list_datasets()) + assert datasets == ["manigeo", "official_empty"] -def test_list_tables(): +def test_list_tables(BIGQ): + """ + Testing the correctness of table names. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ datasets = BIGQ.list_datasets() tables = list(BIGQ.list_tables(datasets[0])) - assert tables == ["space", "test_data", "manigeo_tab"] + assert "space" and "manigeo_tab" in tables -def test_query_is_df(): - query = """ - SELECT name, SUM(number) AS total - FROM `bigquery-public-data.usa_names.usa_1910_2013` - GROUP BY name, gender - ORDER BY total DESC - LIMIT 4 - """ - df = BIGQ.query_to_df(query) +def test_list_columns(BIGQ): + """ + Testing the validity of a column name in a specific table in BigQuery and the return type. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ + columns = BIGQ.list_columns(dataset_name="manigeo", table_name="space") + + assert "my_value" in columns + assert isinstance(columns, ndarray) + + +def test_query_is_df(BIGQ): + """ + Testing the return type of `query_to_df` function. It should be a Data Frame. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ + df = BIGQ.query_to_df(QUERY) assert isinstance(df, pd.DataFrame) -def test_query(): - query = """ - SELECT name, SUM(number) AS total - FROM `bigquery-public-data.usa_names.usa_1910_2013` - GROUP BY name, gender - ORDER BY total DESC - LIMIT 4 - """ - df = BIGQ.query_to_df(query) +def test_query(BIGQ): + """ + Testing the corectness of `query_to_df`execution. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ + df = BIGQ.query_to_df(QUERY) total_received = df["total"].values assert total_received == [4924235, 4818746, 4703680, 4280040] + + +def test_wrong_query(BIGQ): + """ + Testing if the exception is raised with invalid query. + + Args: + BIGQ (Bigquery): Bigquery class instance. + """ + fake_query = """ + SELECT fake_name + FROM `bigquery-public-data.usa_names.fake_table` + ORDER BY fake_name DESC + LIMIT 4 + """ + with pytest.raises(DBDataAccessError): + BIGQ.query_to_df(fake_query) diff --git a/tests/integration/test_genesys.py b/tests/integration/test_genesys.py index d18503d49..817e590b5 100644 --- a/tests/integration/test_genesys.py +++ b/tests/integration/test_genesys.py @@ -144,12 +144,6 @@ def test_environment_param(): assert g.environment != None and type(g.environment) == str -@pytest.mark.init -def test_schedule_id_param(): - g = Genesys() - assert g.schedule_id != None and type(g.schedule_id) == str - - @pytest.mark.parametrize("input_name", ["test_name", "12345", ".##@@"]) @pytest.mark.init def test_other_inicial_params(input_name): @@ -167,31 +161,16 @@ def test_connection_with_genesys_api(): ) -@mock.patch.object(Genesys, "genesys_generate_exports") +@mock.patch.object(Genesys, "genesys_api_connection") @pytest.mark.connection -def test_generate_exports(mock_api_response, var_dictionary): +def test_generate_api_connection(mock_api_response, var_dictionary): g = Genesys() - assert g.genesys_generate_exports() + assert g.genesys_api_connection() mock_api_response.assert_called() -@mock.patch.object(Genesys, "load_reporting_exports") -@pytest.mark.dependency(["test_generate_exports"]) -@pytest.mark.generate -def test_generate_reports_list(mock_load_reports, var_dictionary): - mock_load_reports.return_value = var_dictionary["entities"] - g = Genesys() - g.get_reporting_exports_data() - mock_load_reports.assert_called_once() - - @mock.patch.object(Genesys, "download_report") -@pytest.mark.dependency( - depends=[ - "test_generate_exports", - "test_generate_reports_list", - ] -) +@pytest.mark.dependency(depends=["test_generate_api_connection"]) @pytest.mark.download def test_download_reports(mock_download_files, var_dictionary): g = Genesys() @@ -207,8 +186,7 @@ def test_download_reports(mock_download_files, var_dictionary): @mock.patch("viadot.sources.genesys.handle_api_response", return_value=MockClass) @pytest.mark.dependency( depends=[ - "test_generate_exports", - "test_generate_reports_list", + "test_generate_api_connection", "test_download_reports", ] ) diff --git a/tests/integration/test_sap_bw.py b/tests/integration/test_sap_bw.py new file mode 100644 index 000000000..ce8fd046f --- /dev/null +++ b/tests/integration/test_sap_bw.py @@ -0,0 +1,75 @@ +import pytest +from pyrfc import Connection + +from viadot.sources import SAPBW +from viadot.task_utils import credentials_loader + +CREDENTIALS = credentials_loader.run(credentials_secret="SAP") +SAPBW = SAPBW(credentials=CREDENTIALS.get("BW")) + + +@pytest.fixture(scope="session") +def mdx_query_variable(): + mdx_query = """ + SELECT + { + } + ON COLUMNS, + NON EMPTY + { + { [0CALMONTH].[202301] } + } + DIMENSION PROPERTIES + DESCRIPTION, + MEMBER_NAME + ON ROWS + + FROM ZCSALORD1/ZBW4_ZCSALORD1_006_BOA + + """ + yield mdx_query + + +@pytest.fixture(scope="session") +def wrong_mdx_query_variable(): + wrong_mdx_query = """ + SELECT + { + } + ON COLUMNS, + NON EMPTY + { + { [0CALMONTH].[202301] * + } + DIMENSION PROPERTIES + DESCRIPTION, + MEMBER_NAME + ON ROWS + + FROM ZCSALORD1/ZBW4_ZCSALORD1_006_BOA + + """ + yield wrong_mdx_query + + +def test_get_connection(): + conn = SAPBW.get_connection() + assert isinstance(conn, Connection) + + +def test_get_all_available_columns(mdx_query_variable): + all_available_columns = SAPBW.get_all_available_columns(mdx_query_variable) + assert isinstance(all_available_columns, list) + + +def test_get_output_data(mdx_query_variable): + query_output = SAPBW.get_output_data(mdx_query_variable) + assert isinstance(query_output, dict) + + +def test_wrong_mdx_query_for_get_output_data(wrong_mdx_query_variable): + query_output = SAPBW.get_output_data(wrong_mdx_query_variable) + assert ( + query_output.get("RETURN").get("MESSAGE") + == "Syntax error: Syntax Error : ... { [0CALMONTH].[2023, row 226, item: 226" + ) diff --git a/tests/integration/test_sharepoint.py b/tests/integration/test_sharepoint.py index 8beffd210..c784fa682 100644 --- a/tests/integration/test_sharepoint.py +++ b/tests/integration/test_sharepoint.py @@ -11,25 +11,59 @@ from viadot.tasks.sharepoint import SharepointToDF -def get_url(): +def get_url() -> str: + """ + Function to get file URL. + + Returns: + str: File URL. + """ return local_config["SHAREPOINT"].get("url") @pytest.fixture(scope="session") def sharepoint(): + """ + Fixture for creating a Sharepoint class instance. + The class instance can be used within a test functions to interact with Sharepoint. + """ s = Sharepoint() yield s @pytest.fixture(scope="session") -def FILE_NAME(sharepoint): +def file_name(sharepoint): + """ + A function built to get the path to a file. + + Args: + sharepoint (Sharepoint): Sharepoint class instance. + """ path = "Questionnaires.xlsx" sharepoint.download_file(download_to_path=path, download_from_path=get_url()) yield path os.remove(path) -def test_credentials(): +def test_credentials_not_found(): + """ + Testing if a VauleError is thrown when none of credentials are given. + + Args: + sharepoint (Sharepoint): Sharepoint class instance. + """ + none_credentials = None + with pytest.raises(CredentialError, match=r"Credentials not found."): + Sharepoint(credentials=none_credentials) + + +def test_get_connection_credentials(): + """ + Testing if a CredentialError is thrown when credentials doesn't contain required keys. + + Args: + sharepoint (Sharepoint): Sharepoint class instance. + """ credentials = {"site": "tenant.sharepoint.com", "username": "User"} s = Sharepoint(credentials=credentials) with pytest.raises(CredentialError, match="Missing credentials."): @@ -37,6 +71,12 @@ def test_credentials(): def test_connection(sharepoint): + """ + Testing if connection is succesfull with given credentials. + + Args: + sharepoint (Sharepoint): Sharepoint class instance. + """ credentials = local_config.get("SHAREPOINT") site = f'https://{credentials["site"]}' conn = sharepoint.get_connection() @@ -45,6 +85,7 @@ def test_connection(sharepoint): def test_sharepoint_to_df_task(): + """Testing if result of `SharepointToDF` is a Data Frame.""" task = SharepointToDF() credentials_secret = PrefectSecret("SHAREPOINT_KV").run() res = task.run( @@ -57,31 +98,67 @@ def test_sharepoint_to_df_task(): os.remove("Questionnaires.xlsx") -def test_file_download(FILE_NAME): +def test_download_file_missing_patameters(sharepoint): + """ + Testing if a VauleError is thrown when none of the parameters are given. + + Args: + sharepoint (Sharepoint): Sharepoint class instance. + """ + with pytest.raises(ValueError, match=r"Missing required parameter"): + sharepoint.download_file(download_to_path=None, download_from_path=None) + + +def test_file_download(file_name): + """ + Testing if file is downloaded. + + Args: + file_name (str): File name. + """ files = [] for file in os.listdir(): if os.path.isfile(os.path.join(file)): files.append(file) - assert FILE_NAME in files + assert file_name in files -def test_autopopulating_download_from(FILE_NAME): - assert os.path.basename(get_url()) == FILE_NAME +def test_autopopulating_download_from(file_name): + """ + Testing if file name is correct. + Args: + file_name (str): File name. + """ + assert os.path.basename(get_url()) == file_name -def test_file_extension(sharepoint): + +def test_file_extension(): + """Testing if file has correct extension.""" file_ext = (".xlsm", ".xlsx") assert get_url().endswith(file_ext) -def test_file_to_df(FILE_NAME): - df = pd.read_excel(FILE_NAME, sheet_name=0) +def test_file_to_df(file_name): + """ + Testing if downloaded file contains data and if first sheet can be build as a Data frame. + + Args: + file_name (str): File name. + """ + df = pd.read_excel(file_name, sheet_name=0) df_test = pd.DataFrame(data={"col1": [1, 2]}) assert type(df) == type(df_test) -def test_get_data_types(FILE_NAME): - df = pd.read_excel(FILE_NAME, sheet_name=0) +def test_get_data_types(file_name): + """ + Testing if downloaded file contains data and columns have `String` type. + + Args: + file_name (str): File name. + """ + df = pd.read_excel(file_name, sheet_name=0) dtypes_map = df_get_data_types_task.run(df) dtypes = dtypes_map.values() diff --git a/tests/integration/test_vid_club.py b/tests/integration/test_vid_club.py index 4a75c911e..6c2bd4544 100644 --- a/tests/integration/test_vid_club.py +++ b/tests/integration/test_vid_club.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta from unittest import mock import pandas as pd @@ -5,6 +6,10 @@ from viadot.exceptions import ValidationError from viadot.sources import VidClub +from viadot.task_utils import credentials_loader + +CREDENTIALS = credentials_loader.run(credentials_secret="VIDCLUB") +vc = VidClub(credentials=CREDENTIALS) @pytest.fixture @@ -22,24 +27,22 @@ def json(): return df -@pytest.mark.init -def test_create_club_class(): - vc = VidClub() - assert vc - - @pytest.mark.init def test_default_credential_param(): - vc = VidClub() - assert vc.credentials != None and type(vc.credentials) == dict + """ + Checks if credentials are loaded from Azure Key Vault or PrefectSecret or from local config ursing credentials_loader and if it's dictionary type. + """ + assert vc.credentials is not None and isinstance(vc.credentials, dict) -@pytest.mark.proper +@pytest.mark.build_query def test_build_query_wrong_source(): + """ + Checks if passing different source than Literal["jobs", "product", "company", "survey"] is catched and returns error. + """ with pytest.raises( ValidationError, match=r"Pick one these sources: jobs, product, company, survey" ): - vc = VidClub() query = vc.build_query( source="test", from_date="2023-03-24", @@ -49,12 +52,86 @@ def test_build_query_wrong_source(): ) +@pytest.mark.build_query +def test_url_string(): + """ + Checks if fucntion generates URL with needed parameters. + """ + source = "jobs" + from_date = "2023-03-24" + to_date = "2023-03-24" + api_url = "https://api/test/" + items_per_page = 1 + + expected_elements = [ + f"from={from_date}", + f"to={to_date}", + "region=all", + f"limit={items_per_page}", + api_url, + ] + + query = vc.build_query( + source=source, + from_date=from_date, + to_date=to_date, + api_url=api_url, + items_per_page=items_per_page, + ) + + assert all(ex in query for ex in expected_elements) + + +@pytest.mark.intervals +def test_intervals_split(): + """ + Checks if prrovided date range with days_interval creates list with expected split. + """ + from_date = "2022-01-01" + to_date = "2022-01-19" + days_interval = 5 + expected_starts = ["2022-01-01", "2022-01-06", "2022-01-11", "2022-01-16"] + expected_ends = ["2022-01-06", "2022-01-11", "2022-01-16", "2022-01-19"] + starts, ends = vc.intervals( + from_date=from_date, to_date=to_date, days_interval=days_interval + ) + + assert starts == expected_starts + assert ends == expected_ends + + +@pytest.mark.connection_check +def test_check_connection(): + """ + Checks if check_connection method returns tuple with dictionary and string. + """ + output = vc.check_connection( + source="jobs", + from_date="2023-03-24", + to_date="2023-03-24", + items_per_page=1, + ) + + response, first_url = vc.check_connection( + source="jobs", + from_date="2023-03-24", + to_date="2023-03-24", + items_per_page=1, + ) + + assert isinstance(output, tuple) + assert isinstance(response, dict) + assert isinstance(first_url, str) + + @pytest.mark.proper def test_get_response_wrong_source(): + """ + Checks if ValidationError is returned when passing wrong source name. + """ with pytest.raises( ValidationError, match=r"The source has to be: jobs, product, company or survey" ): - vc = VidClub() query = vc.get_response(source="test") @@ -64,27 +141,104 @@ def test_get_response_wrong_source(): @pytest.mark.parametrize("source", ["jobs", "company", "product", "survey"]) @pytest.mark.proper def test_get_response_sources(mock_api_response, source): - vc = VidClub() + """ + Checks if get_response method returnes DataFrame for each of the 4 possible sources. + Test assert that the mock was called exactly once. + + Args: + mock_api_response: Mocked return_value for get_response method. + source: The endpoint source to be accessed. + """ query = vc.get_response(source=source, to_date="2023-03-24", from_date="2023-03-24") assert isinstance(query, pd.DataFrame) + mock_api_response.assert_called_once() @pytest.mark.proper def test_get_response_wrong_date(): + """ + Checks if ValidationError is returned when passing from_date earlier than 2022-03-22. + """ with pytest.raises( - ValidationError, match=r"from_date cannot be earlier than 2023-03-22!!" + ValidationError, match=r"from_date cannot be earlier than 2022-03-22" ): - vc = VidClub() - query = vc.get_response(source="jobs", from_date="2021-05-09") + vc.get_response(source="jobs", from_date="2021-05-09") @pytest.mark.proper def test_get_response_wrong_date_range(): + """ + Checks if ValidationError is returned when passing to_date earlier than from_date. + """ with pytest.raises( - ValidationError, match=r"to_date cannot be earlier than from_date!" + ValidationError, match=r"to_date cannot be earlier than from_date" ): - vc = VidClub() - query = vc.get_response( - source="jobs", to_date="2022-05-04", from_date="2022-05-05" - ) + vc.get_response(source="jobs", to_date="2022-05-04", from_date="2022-05-05") + + +@pytest.mark.total_load +def test_total_load_for_the_same_dates(): + """ + total_load method includes logic for situation when from_date == to_date. In this scenario interval split is skipped and used just get_response method. + This test checks if this logic is executed without error and returned object if DataFrame. + """ + from_date = "2022-04-01" + to_date = "2022-04-01" + days_interval = 10 + source = "jobs" + df = vc.total_load( + from_date=from_date, to_date=to_date, days_interval=days_interval, source=source + ) + + assert isinstance(df, pd.DataFrame) + + +@pytest.mark.total_load +def test_total_load_for_intervals(): + """ + Checks if interval function is properly looped in the total_load method. At first we check if returned object is DataFrame, + then we check if returned DataFrame for smaller date range contains less rows than DataFrame returned for bigger date range. + """ + from_date = "2022-04-01" + to_date = "2022-04-12" + days_interval = 2 + source = "jobs" + + date_object = datetime.strptime(from_date, "%Y-%m-%d") + timedelta( + days=days_interval + ) + one_interval = date_object.strftime("%Y-%m-%d") + + df = vc.total_load( + from_date=from_date, to_date=to_date, days_interval=days_interval, source=source + ) + df_one_interval = vc.total_load( + from_date=from_date, + to_date=one_interval, + days_interval=days_interval, + source=source, + ) + + assert isinstance(df, pd.DataFrame) + assert len(df) > len(df_one_interval) + + +@pytest.mark.total_load +def test_drop_duplicates(): + """ + Checks logic for dropping duplicated rows, that is included in total_load method. + Test checks if returned DataFrame has duplicates. + """ + from_date = "2022-04-01" + to_date = "2022-04-12" + days_interval = 2 + source = "jobs" + + df = vc.total_load( + from_date=from_date, to_date=to_date, days_interval=days_interval, source=source + ) + dups_mask = df.duplicated() + df_check = df[dups_mask] + + assert len(df_check) == 0 diff --git a/tests/test_viadot.py b/tests/test_viadot.py index 31b71febf..5bcee77ba 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.17" + assert __version__ == "0.4.18" diff --git a/tests/unit/test_base.py b/tests/unit/test_base.py index f36115b6f..c20336b29 100644 --- a/tests/unit/test_base.py +++ b/tests/unit/test_base.py @@ -1,5 +1,6 @@ -import os import logging +import os + import pandas as pd import pyarrow as pa import pytest diff --git a/tests/unit/test_outlook.py b/tests/unit/test_outlook.py index 38b98c141..d2935ca93 100644 --- a/tests/unit/test_outlook.py +++ b/tests/unit/test_outlook.py @@ -1,13 +1,13 @@ import os -import pytest -from unittest import mock from datetime import datetime +from unittest import mock import pandas as pd +import pytest from O365.account import Account -from viadot.sources import Outlook from viadot.exceptions import CredentialError +from viadot.sources import Outlook @pytest.fixture @@ -21,6 +21,7 @@ def var_dictionary(): "client_secret": "abcdefghijklmnopqrstuvwxyz", "tenant_id": "abcdefghijklmnopqrstuvwxyz", }, + "final_dict_folders": {"Mailbox": MockValue}, } return variables @@ -36,6 +37,42 @@ def mailbox(): return None +class MockValue: + name = "mailbox" + + def get_messages(limit=1): + return [MockMessage] + + +class MockMessage: + subject = "subject" + received = "2023-04-12T06:09:59+00:00" + categories = ["categories"] + conversation_index = "aaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + def to_api_data(): + data = { + "toRecipients": [ + { + "emailAddress": { + "address": "random@random.com", + "name": "random", + } + }, + { + "emailAddress": { + "address": "random@random2.com", + "name": "random", + } + }, + ], + "from": {"emailAddress": {"address": "random@random.ee", "name": "name"}}, + "receivedDateTime": "2022-04-01T06:09:59+00:00", + "conversationId": "bbbb", + } + return data + + @pytest.mark.basics def test_outlook_credentials(var_dictionary): o = Outlook( @@ -53,6 +90,36 @@ def test_outlook_credentials(var_dictionary): ) +@pytest.mark.basics +@mock.patch("viadot.sources.outlook.Account", return_value=MockClass) +def test_outlook_mailbox_limit(mock_api_Account, var_dictionary): + o = Outlook( + mailbox_name=var_dictionary["mailbox_name"], + credentials=var_dictionary["credentials"], + start_date=var_dictionary["start_date"], + end_date=var_dictionary["end_date"], + ) + data = o._get_messages_from_mailbox( + var_dictionary["final_dict_folders"], address_limit=20 + ) + assert len(data[0]["recivers"]) <= 20 + + +@pytest.mark.basics +@mock.patch("viadot.sources.outlook.Account", return_value=MockClass) +def test_outlook_mailbox_space(mock_api_Account, var_dictionary): + o = Outlook( + mailbox_name=var_dictionary["mailbox_name"], + credentials=var_dictionary["credentials"], + start_date=var_dictionary["start_date"], + end_date=var_dictionary["end_date"], + ) + data = o._get_messages_from_mailbox( + var_dictionary["final_dict_folders"], address_limit=5 + ) + assert len(data[0]["recivers"]) == 0 + + @pytest.mark.exceptions def test_outlook_credential_exception(var_dictionary): with pytest.raises(CredentialError): diff --git a/viadot/__init__.py b/viadot/__init__.py index ac1552129..2dd553604 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.17" +__version__ = "0.4.18" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index f91cd087c..332a010bd 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -30,6 +30,11 @@ except ImportError: pass +try: + from .sap_bw_to_adls import SAPBWToADLS +except ImportError: + pass + from .customer_gauge_to_adls import CustomerGaugeToADLS from .epicor_to_duckdb import EpicorOrdersToDuckDB from .eurostat_to_adls import EurostatToADLS @@ -38,4 +43,6 @@ from .mindful_to_adls import MindfulToADLS from .sftp_operations import SftpToADLS, SftpToAzureSQL from .sql_server_to_duckdb import SQLServerToDuckDB +from .sql_server_to_parquet import SQLServerToParquet from .sql_server_transform import SQLServerTransform +from .vid_club_to_adls import VidClubToADLS diff --git a/viadot/flows/genesys_to_adls.py b/viadot/flows/genesys_to_adls.py index fbcb6df23..4f8c54f3e 100644 --- a/viadot/flows/genesys_to_adls.py +++ b/viadot/flows/genesys_to_adls.py @@ -80,15 +80,16 @@ def __init__( view_type: str = None, view_type_time_sleep: int = 80, post_data_list: List[str] = None, - end_point: str = "reporting/exports", + end_point: str = "analytics/reporting/exports", list_of_userids: list = None, start_date: str = None, end_date: str = None, sep: str = "\t", environment: str = None, - schedule_id: str = None, report_url: str = None, report_columns: List[str] = None, + conversationId_list: List[str] = None, + key_list: List[str] = None, local_file_path: str = "", adls_file_path: str = None, overwrite_adls: bool = True, @@ -125,16 +126,17 @@ def __init__( >>> "hasCustomParticipantAttributes": True, >>> }]''' If you need to add more POSTs in the same call, just add them to the list separated by a comma. - endpoint (str, optional): Final end point for Genesys connection. Defaults to "reporting/exports". + end_point (str, optional): Final end point for Genesys connection. Defaults to "analytics/reporting/exports". list_of_userids (list, optional): List of all user IDs to select in the data frame. Defaults to None. start_date (str, optional): Start date of the report. Defaults to None. end_date (str, optional): End date of the report. Defaults to None. sep (str, optional): Separator in csv file. Defaults to "\t". environment (str, optional): Adress of host server. Defaults to None than will be used enviroment from credentials. - schedule_id (str, optional): The ID of report. Defaults to None. report_url (str, optional): The url of report generated in json response. Defaults to None. report_columns (List[str], optional): List of exisiting column in report. Defaults to None. + conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. + key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. local_file_path (str, optional): The local path from which to upload the file(s). Defaults to "". adls_file_path (str, optional): The destination path at ADLS. Defaults to None. overwrite_adls (bool, optional): Whether to overwrite files in the data lake. Defaults to True. @@ -150,15 +152,16 @@ def __init__( self.view_type_time_sleep = view_type_time_sleep self.post_data_list = post_data_list self.end_point = end_point - if self.end_point == "conversations/details/query": + if self.end_point == "analytics/conversations/details/query": self.apply_method = True else: self.apply_method = False self.list_of_userids = list_of_userids self.environment = environment - self.schedule_id = schedule_id self.report_url = report_url self.report_columns = report_columns + self.conversationId_list = conversationId_list + self.key_list = key_list self.start_date = start_date self.end_date = end_date self.sep = sep @@ -176,7 +179,6 @@ def __init__( self.gen_flow() def gen_flow(self) -> Flow: - to_csv = GenesysToCSV( timeout=self.timeout, local_file_path=self.local_file_path, @@ -191,6 +193,8 @@ def gen_flow(self) -> Flow: start_date=self.start_date, end_date=self.end_date, environment=self.environment, + conversationId_list=self.conversationId_list, + key_list=self.key_list, credentials_genesys=self.credentials_genesys, flow=self, ) diff --git a/viadot/flows/sap_bw_to_adls.py b/viadot/flows/sap_bw_to_adls.py new file mode 100644 index 000000000..375f30775 --- /dev/null +++ b/viadot/flows/sap_bw_to_adls.py @@ -0,0 +1,167 @@ +import os +from pathlib import Path +from typing import Any, Dict, List, Literal + +import pendulum +from prefect import Flow +from prefect.backend import set_key_value + +from viadot.task_utils import ( + add_ingestion_metadata_task, + df_get_data_types_task, + df_map_mixed_dtypes_for_parquet, + df_to_csv, + df_to_parquet, + dtypes_to_json_task, + update_dtypes_dict, +) +from viadot.tasks import AzureDataLakeUpload, SAPBWToDF + + +class SAPBWToADLS(Flow): + def __init__( + self, + name: str, + mdx_query: str, + mapping_dict: dict = None, + sapbw_credentials: dict = None, + sapbw_credentials_key: str = "SAP", + env: str = "BW", + output_file_extension: str = ".parquet", + local_file_path: str = None, + adls_file_name: str = None, + adls_dir_path: str = None, + if_exists: Literal["replace", "append", "delete"] = "replace", + overwrite_adls: bool = True, + vault_name: str = None, + sp_credentials_secret: str = None, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Flow for downloading data from SAP BW to file, then uploading it to ADLS. + + Args: + name (str): Name of the flow. + mdx_query (str): MDX query to be passed to SAP BW server. + mapping_dict (dict, optional): Dictionary with original column names and the mapping for them. If not None then flows is generating mapping automatically with mapping applied by user, if not - it generates automatically the json file with columns. + sapbw_credentials (dict, optional): Credentials to SAP in dictionary format. Defaults to None. + sapbw_credentials_key (str, optional): Azure KV secret. Defaults to "SAP". + env (str, optional): SAP environment. Defaults to "BW". + output_file_extension (str, optional): Output file extension - to allow selection between .csv and .parquet. Defaults to ".parquet". + local_file_path (str, optional): Local destination path. Defaults to None. + adls_file_name (str, optional): Azure Data Lake file name. Defaults to None. + adls_dir_path(str, optional): Azure Data Lake destination file path. Defaults to None. + if_exists (Literal["append", "replace", "skip"], optional): What to do if the table exists. Defaults to "replace". + overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True. + vault_name (str, optional): The name of the vault from which to obtain the secrets.. Defaults to None. + sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + """ + self.sapbw_credentials = sapbw_credentials + self.sapbw_credentials_key = sapbw_credentials_key + self.env = env + self.mdx_query = mdx_query + self.mapping_dict = mapping_dict + self.output_file_extension = output_file_extension + + self.local_file_path = ( + local_file_path or self.slugify(name) + self.output_file_extension + ) + self.local_json_path = self.slugify(name) + ".json" + self.now = str(pendulum.now("utc")) + self.adls_dir_path = adls_dir_path + + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + + self.if_exists = if_exists + self.overwrite_adls = overwrite_adls + self.vault_name = vault_name + self.sp_credentials_secret = sp_credentials_secret + + super().__init__(*args, name=name, **kwargs) + self.gen_flow() + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() + + def gen_flow(self) -> Flow: + sapbw_to_df_task = SAPBWToDF( + sapbw_credentials=self.sapbw_credentials, + sapbw_credentials_key=self.sapbw_credentials_key, + env=self.env, + ) + + df = sapbw_to_df_task.bind( + mdx_query=self.mdx_query, + mapping_dict=self.mapping_dict, + flow=self, + ) + + df_viadot_downloaded = add_ingestion_metadata_task.bind(df=df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_viadot_downloaded, flow=self) + + df_to_be_loaded = df_map_mixed_dtypes_for_parquet.bind( + df_viadot_downloaded, dtypes_dict, flow=self + ) + + if self.output_file_extension == ".parquet": + df_to_file = df_to_parquet.bind( + df=df_to_be_loaded, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + else: + df_to_file = df_to_csv.bind( + df=df_to_be_loaded, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + + file_to_adls_task = AzureDataLakeUpload() + adls_upload = file_to_adls_task.bind( + from_path=self.local_file_path, + to_path=self.adls_file_path, + overwrite=self.overwrite_adls, + sp_credentials_secret=self.sp_credentials_secret, + flow=self, + ) + + dtypes_updated = update_dtypes_dict(dtypes_dict, flow=self) + dtypes_to_json_task.bind( + dtypes_dict=dtypes_updated, local_json_path=self.local_json_path, flow=self + ) + + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite_adls, + sp_credentials_secret=self.sp_credentials_secret, + vault_name=self.vault_name, + flow=self, + ) + + df_viadot_downloaded.set_upstream(df, flow=self) + dtypes_dict.set_upstream(df_viadot_downloaded, flow=self) + df_to_be_loaded.set_upstream(dtypes_dict, flow=self) + adls_upload.set_upstream(df_to_file, flow=self) + + df_to_file.set_upstream(dtypes_updated, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/sql_server_to_parquet.py b/viadot/flows/sql_server_to_parquet.py new file mode 100644 index 000000000..ba54a0a7d --- /dev/null +++ b/viadot/flows/sql_server_to_parquet.py @@ -0,0 +1,56 @@ +from typing import Any, Dict, List, Literal + +from prefect import Flow + +from viadot.task_utils import df_to_parquet +from viadot.tasks import SQLServerToDF + + +class SQLServerToParquet(Flow): + def __init__( + self, + name, + sql_query: str, + local_file_path: str, + sqlserver_config_key: str = None, + if_exists: Literal["fail", "replace", "append", "skip", "delete"] = "fail", + timeout: int = 3600, + *args: List[any], + **kwargs: Dict[str, Any], + ): + """ + Flow for uploading data from SQL Server to Parquet file. + + Args: + name (str, required): The name of the flow. + sql_query (str, required): The query to execute on the SQL Server database. If don't start with "SELECT" + returns empty DataFrame. + local_file_path (str, required): Path to output parquet file. + sqlserver_config_key (str, optional): The key inside local config containing the credentials. Defaults to None. + if_exists (Literal, optional): What to do if the file already exists. Defaults to "fail". + timeout(int, optional): The amount of time (in seconds) to wait while running this task before + a timeout occurs. Defaults to 3600. + """ + # SQLServerToDF + self.sql_query = sql_query + self.sqlserver_config_key = sqlserver_config_key + self.timeout = timeout + + self.local_file_path = local_file_path + self.if_exists = if_exists + + super().__init__(*args, name=name, **kwargs) + + self.gen_flow() + + def gen_flow(self) -> Flow: + df_task = SQLServerToDF(timeout=self.timeout) + df = df_task.bind( + config_key=self.sqlserver_config_key, query=self.sql_query, flow=self + ) + parquet = df_to_parquet.bind( + df=df, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) diff --git a/viadot/flows/vid_club_to_adls.py b/viadot/flows/vid_club_to_adls.py new file mode 100644 index 000000000..59d676c51 --- /dev/null +++ b/viadot/flows/vid_club_to_adls.py @@ -0,0 +1,196 @@ +import os +from pathlib import Path +from typing import Any, Dict, List, Literal + +import pandas as pd +import pendulum +from prefect import Flow +from prefect.backend import set_key_value +from prefect.utilities import logging + +from viadot.task_utils import ( + add_ingestion_metadata_task, + df_get_data_types_task, + df_map_mixed_dtypes_for_parquet, + df_to_csv, + df_to_parquet, + dtypes_to_json_task, + update_dtypes_dict, +) +from viadot.tasks import AzureDataLakeUpload, VidClubToDF + +logger = logging.get_logger(__name__) + + +class VidClubToADLS(Flow): + def __init__( + self, + name: str, + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all", + days_interval: int = 30, + cols_to_drop: List[str] = None, + vid_club_credentials: Dict[str, Any] = None, + vidclub_credentials_secret: str = "VIDCLUB", + vidclub_vault_name: str = None, + output_file_extension: str = ".parquet", + adls_dir_path: str = None, + local_file_path: str = None, + adls_file_name: str = None, + vault_name: str = None, + adls_sp_credentials_secret: str = None, + overwrite_adls: bool = False, + if_exists: str = "replace", + timeout: int = 3600, + *args: List[Any], + **kwargs: Dict[str, Any] + ): + """ + Flow for downloading data from the Vid Club via API to a CSV or Parquet file. + Then upload it to Azure Data Lake. + + Args: + name (str): The name of the flow. + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API] + days_interval (int, optional): Days specified in date range per API call (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. + vid_club_credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None. + vidclub_credentials_secret (str, optional): The name of the secret in Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". + vidclub_vault_name (str, optional): For Vid Club credentials stored in Azure Key Vault. The name of the vault from which to obtain the secret. Defaults to None. + output_file_extension (str, optional): Output file extension - to allow selection of .csv for data + which is not easy to handle with parquet. Defaults to ".parquet". + adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None. + local_file_path (str, optional): Local destination path. Defaults to None. + adls_file_name (str, optional): Name of file in ADLS. Defaults to None. + vault_name (str, optional): For ADLS credentials stored in Azure Key Vault. The name of the vault from which to obtain the secret. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. + if_exists (str, optional): What to do if the file exists. Defaults to "replace". + timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. + """ + # VidClubToDF + self.source = source + self.from_date = from_date + self.to_date = to_date + self.items_per_page = items_per_page + self.region = region + self.days_interval = days_interval + self.cols_to_drop = cols_to_drop + self.vid_club_credentials = vid_club_credentials + self.vidclub_credentials_secret = vidclub_credentials_secret + self.vidclub_vault_name = vidclub_vault_name + + # AzureDataLakeUpload + self.adls_file_name = adls_file_name + self.adls_dir_path = adls_dir_path + self.local_file_path = local_file_path + self.overwrite = overwrite_adls + self.vault_name = vault_name + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.if_exists = if_exists + self.output_file_extension = output_file_extension + self.timeout = timeout + self.now = str(pendulum.now("utc")) + + self.local_file_path = ( + local_file_path or self.slugify(name) + self.output_file_extension + ) + self.local_json_path = self.slugify(name) + ".json" + self.adls_dir_path = adls_dir_path + + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + + super().__init__(*args, name=name, **kwargs) + + self.gen_flow() + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() + + def gen_flow(self) -> Flow: + vid_club_df_task = VidClubToDF( + timeout=self.timeout, + source=self.source, + credentials=self.vid_club_credentials, + credentials_secret=self.vidclub_credentials_secret, + vault_name=self.vidclub_vault_name, + ) + + vid_club_df = vid_club_df_task.bind( + from_date=self.from_date, + to_date=self.to_date, + items_per_page=self.items_per_page, + region=self.region, + days_interval=self.days_interval, + cols_to_drop=self.cols_to_drop, + flow=self, + ) + + df_with_metadata = add_ingestion_metadata_task.bind(vid_club_df, flow=self) + + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) + + if self.output_file_extension == ".parquet": + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + else: + df_to_file = df_to_csv.bind( + df=df_with_metadata, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + + file_to_adls_task = AzureDataLakeUpload(timeout=self.timeout) + file_to_adls_task.bind( + from_path=self.local_file_path, + to_path=self.adls_file_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_updated = update_dtypes_dict(dtypes_dict, flow=self) + dtypes_to_json_task.bind( + dtypes_dict=dtypes_updated, local_json_path=self.local_json_path, flow=self + ) + json_to_adls_task = AzureDataLakeUpload(timeout=self.timeout) + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 267fcbb35..094cec14e 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -16,6 +16,11 @@ except ImportError: pass +try: + from .sap_bw import SAPBW +except ImportError: + pass + from .business_core import BusinessCore from .customer_gauge import CustomerGauge from .duckdb import DuckDB diff --git a/viadot/sources/business_core.py b/viadot/sources/business_core.py index 2409f5ffe..3495cecd9 100644 --- a/viadot/sources/business_core.py +++ b/viadot/sources/business_core.py @@ -1,8 +1,8 @@ -import pandas as pd import json -from prefect.utilities import logging from typing import Any, Dict, Literal +import pandas as pd +from prefect.utilities import logging from ..config import local_config from ..exceptions import APIError, CredentialError diff --git a/viadot/sources/genesys.py b/viadot/sources/genesys.py index 15b466e64..4556ededf 100644 --- a/viadot/sources/genesys.py +++ b/viadot/sources/genesys.py @@ -8,9 +8,8 @@ import aiohttp import pandas as pd -from datetime import datetime, timedelta -from aiolimiter import AsyncLimiter import prefect +from aiolimiter import AsyncLimiter from prefect.engine import signals from viadot.config import local_config @@ -33,7 +32,6 @@ def __init__( credentials_genesys: Dict[str, Any] = None, environment: str = None, report_url: str = None, - schedule_id: str = None, report_columns: List[str] = None, *args: List[Any], **kwargs: Dict[str, Any], @@ -77,7 +75,6 @@ def __init__( super().__init__(*args, credentials=self.credentials_genesys, **kwargs) self.view_type = view_type - self.schedule_id = schedule_id self.report_name = report_name self.environment = environment self.report_url = report_url @@ -89,22 +86,9 @@ def __init__( self.ids_mapping = ids_mapping self.count = iter(range(99999)) - if self.schedule_id is None: - self.schedule_id = self.credentials.get("SCHEDULE_ID", None) - if self.environment is None: self.environment = self.credentials.get("ENVIRONMENT", None) - if self.ids_mapping is None: - self.ids_mapping = self.credentials.get("IDS_MAPPING", None) - - if type(self.ids_mapping) is dict and self.ids_mapping is not None: - self.logger.info("IDS_MAPPING loaded from local credential.") - else: - self.logger.warning( - "IDS_MAPPING is not provided in you credentials or is not a dictionary." - ) - self.report_data = [] @property @@ -151,14 +135,22 @@ def authorization_token(self, verbose: bool = False): return request_headers - def genesys_generate_exports( - self, post_data_list: List[str], end_point: str = "reporting/exports" + def genesys_api_connection( + self, + post_data_list: List[str], + end_point: str = "analytics/reporting/exports", + params: Dict[str, Any] = None, + method: Literal["POST", "GET"] = "POST", + sleep_time: int = 0.5, ) -> Optional[dict]: - """Function that make POST request method to generate export reports. + """Function that make POST request method to Genesys API given and endpoint. Args: post_data_list (List[str], optional): List of string templates to generate json body. Defaults to None. - end_point (str, optional): Final end point for Genesys connection. Defaults to "reporting/exports". + end_point (str, optional): Final end point for Genesys connection. Defaults to "analytics/reporting/exports". + params (Dict[str, Any], optional): Parameters to be passed into the POST call. Defaults to None. + method (Literal["POST", "GET"], optional): Type of connection to the API. Defaults to "POST". + sleep_time (int, optional): The time, in seconds, to sleep the script between calls to the API. Defaults to 0.5 Returns: Optional[dict]: Dict when the "conversations" endpoint is called, otherwise returns None. @@ -166,18 +158,17 @@ def genesys_generate_exports( limiter = AsyncLimiter(2, 15) semaphore = asyncio.Semaphore(value=1) + url = f"https://api.{self.environment}/api/v2/{end_point}" async def generate_post(): - cnt = 0 - for data_to_post in post_data_list: - if cnt < 10: - payload = json.dumps(data_to_post) - async with aiohttp.ClientSession() as session: - await semaphore.acquire() - async with limiter: + payload = json.dumps(data_to_post) + async with aiohttp.ClientSession() as session: + await semaphore.acquire() + async with limiter: + if method == "POST": async with session.post( - f"https://api.{self.environment}/api/v2/analytics/{end_point}", + url, headers=self.authorization_token, data=payload, ) as resp: @@ -187,17 +178,21 @@ async def generate_post(): f"Generated report export --- \n {payload}." ) semaphore.release() - cnt += 1 - else: - await asyncio.sleep(3) - cnt = 0 + elif method == "GET": + async with session.get( + url, + headers=self.authorization_token, + params=params, + ) as resp: + new_report = await resp.read() + semaphore.release() + await asyncio.sleep(sleep_time) loop = asyncio.get_event_loop() coroutine = generate_post() loop.run_until_complete(coroutine) - if end_point == "conversations/details/query": - return json.loads(new_report.decode("utf-8")) + return json.loads(new_report.decode("utf-8")) def load_reporting_exports(self, page_size: int = 100, verbose: bool = False): """ @@ -322,21 +317,17 @@ def download_all_reporting_exports( ) continue - if single_report[4].lower() == "queue_performance_detail_view": - file_name = ( - temp_ids_mapping.get(single_report[2]) + "_" + single_report[3] - ).upper() + date = self.start_date.replace("-", "") + if single_report[4].lower() in [ + "queue_performance_detail_view", + "agent_status_detail_view", + ]: + file_name = f"{self.view_type.upper()}_{next(self.count)}_{date}" elif single_report[4].lower() in [ "agent_performance_summary_view", "agent_status_summary_view", ]: - date = self.start_date.replace("-", "") file_name = self.view_type.upper() + "_" + f"{date}" - elif single_report[4].lower() in [ - "agent_status_detail_view", - ]: - date = self.start_date.replace("-", "") - file_name = self.view_type.upper() + f"_{next(self.count)}_" + f"{date}" else: raise signals.SKIP( message=f"View type {self.view_type} not defined in viadot, yet..." @@ -357,36 +348,6 @@ def download_all_reporting_exports( self.logger.info("Successfully genetared file names list.") return file_name_list - def generate_reporting_export( - self, data_to_post: Dict[str, Any], verbose: bool = False - ) -> int: - """ - POST method for reporting export. - - Args: - data_to_post (Dict[str, Any]): json format of POST body. - verbose (bool, optional): Decide if enable logging. - - Returns: - new_report.status_code: status code - """ - payload = json.dumps(data_to_post) - new_report = handle_api_response( - url=f"https://api.{self.environment}/api/v2/analytics/reporting/exports", - headers=self.authorization_token, - method="POST", - body=payload, - ) - if verbose: - if new_report.status_code == 200: - self.logger.info("Succesfully generated new export.") - else: - self.logger.error( - f"Failed to generated new export. - {new_report.content}" - ) - raise APIError("Failed to generated new export.") - return new_report.status_code - def delete_reporting_exports(self, report_id) -> int: """DELETE method for deleting particular reporting exports. @@ -421,116 +382,3 @@ def delete_all_reporting_exports(self) -> None: assert status_code < 300 self.logger.info("Successfully removed all reports.") - - def get_analitics_url_report(self) -> str: - """ - Fetching analytics report url from json response. - - Returns: - string: url for analytics report - """ - response = handle_api_response( - url=f"https://api.{self.environment}/api/v2/analytics/reporting/schedules/{self.schedule_id}", - headers=self.authorization_token, - ) - try: - response_json = response.json() - report_url = response_json.get("lastRun", None).get("reportUrl", None) - self.logger.info("Successfully downloaded report from genesys api") - return report_url - except AttributeError as e: - self.logger.error( - "Output data error: " + str(type(e).__name__) + ": " + str(e) - ) - - def get_all_schedules_job(self) -> json: - """ - Fetching analytics report url from json response. - - Returns: - string: json body with all schedules jobs. - """ - response = handle_api_response( - url=f"https://api.{self.environment}/api/v2/analytics/reporting/schedules", - headers=self.authorization_token, - ) - try: - response_json = response.json() - self.logger.info("Successfully downloaded schedules jobs.") - return response_json - except AttributeError as e: - self.logger.error( - "Output data error: " + str(type(e).__name__) + ": " + str(e) - ) - - def schedule_report(self, data_to_post: Dict[str, Any]) -> json: - """ - POST method for report scheduling. - - Args: - data_to_post (Dict[str, Any]): json format of POST body. - - Returns: - new_report.status_code: status code - """ - payload = json.dumps(data_to_post) - new_report = handle_api_response( - url=f"https://api.{self.environment}/api/v2/analytics/reporting/schedules", - headers=self.authorization_token, - method="POST", - body=payload, - ) - if new_report.status_code == 200: - self.logger.info("Succesfully scheduled new report.") - else: - self.logger.error(f"Failed to scheduled new report. - {new_report.content}") - raise APIError("Failed to scheduled new report.") - return new_report.status_code - - def to_df(self, report_url: str = None): - """Download genesys data into a pandas DataFrame. - - Args: - report_url (str): Report url from api response. - - Returns: - pd.DataFrame: the DataFrame with time range - """ - if report_url is None: - report_url = self.get_analitics_url_report - response_file = handle_api_response( - url=f"{report_url}", headers=self.authorization_token - ) - if self.report_columns is None: - df = pd.read_excel(response_file.content, header=6) - else: - df = pd.read_excel( - response_file.content, names=self.report_columns, skiprows=6 - ) - - return df - - def delete_scheduled_report_job(self, report_id: str): - """DELETE method for deleting particular report job. - - Args: - report_id (str): defined at the end of report url. - - Returns: - delete_method.status_code: status code - """ - delete_method = handle_api_response( - url=f"https://api.{self.environment}/api/v2/analytics/reporting/schedules/{report_id}", - headers=self.authorization_token, - method="DELETE", - ) - if delete_method.status_code == 200: - self.logger.info("Successfully deleted report from Genesys API.") - - else: - self.logger.error( - f"Failed to deleted report from Genesys API. - {delete_method.content}" - ) - raise APIError("Failed to deleted report from Genesys API.") - - return delete_method.status_code diff --git a/viadot/sources/outlook.py b/viadot/sources/outlook.py index 86caff298..266b28b45 100644 --- a/viadot/sources/outlook.py +++ b/viadot/sources/outlook.py @@ -1,11 +1,10 @@ import sys -import pytz from datetime import date, datetime, timedelta from typing import Any, Dict, List -import prefect import pandas as pd import prefect +import pytz from O365 import Account from O365.mailbox import MailBox @@ -149,6 +148,7 @@ def _get_messages_from_mailbox( self, dict_folder: dict, limit: int = 10000, + address_limit: int = 8000, outbox_list: List[str] = ["Sent Items"], ) -> list: """To retrieve all messages from all the mailboxes passed in the dictionary. @@ -157,6 +157,8 @@ def _get_messages_from_mailbox( dict_folder (dict): Mailboxes dictionary holder, with the following structure: "parent (sub)folder|(sub)folder": Mailbox. limit (int, optional): Number of fetched top messages. Defaults to 10000. + address_limit (int, optional): The maximum number of accepted characters in the sum + of all email names. Defaults to 8000. outbox_list (List[str], optional): List of outbox folders to differenciate between Inboxes and Outboxes. Defaults to ["Sent Items"]. @@ -182,10 +184,16 @@ def _get_messages_from_mailbox( recivers_list = fetched.get("toRecipients") recivers = " " if recivers_list is not None: - recivers = ", ".join( - reciver["emailAddress"]["address"] - for reciver in recivers_list - ) + for reciver in recivers_list: + add_string = f", {reciver['emailAddress']['address']}" + if ( + sum(list(map(len, [recivers, add_string]))) + >= address_limit + ): + break + else: + recivers += add_string + categories = " " if message.categories is not None: categories = ", ".join( @@ -194,14 +202,18 @@ def _get_messages_from_mailbox( conversation_index = " " if message.conversation_index is not None: conversation_index = message.conversation_index + if isinstance(message.subject, str): + subject = message.subject.replace("\t", " ") + else: + subject = message.subject row = { "(sub)folder": value.name, "conversation ID": fetched.get("conversationId"), "conversation index": conversation_index, "categories": categories, "sender": sender_mail, - "subject": message.subject, - "recivers": recivers, + "subject": subject, + "recivers": recivers.strip(", "), "received_time": fetched.get("receivedDateTime"), "mail_adress": self.mailbox_name.split("@")[0] .replace(".", "_") @@ -213,7 +225,8 @@ def _get_messages_from_mailbox( row["Inbox"] = True data.append(row) - self.logger.info(f"folder: {key.ljust(76, '-')} messages: {count}") + if count > 0: + self.logger.info(f"folder: {key.ljust(76, '-')} messages: {count}") return data diff --git a/viadot/sources/sap_bw.py b/viadot/sources/sap_bw.py new file mode 100644 index 000000000..73b1d2efa --- /dev/null +++ b/viadot/sources/sap_bw.py @@ -0,0 +1,129 @@ +import textwrap +from typing import List + +from pyrfc import Connection + +from viadot.exceptions import CredentialError, ValidationError +from viadot.sources.base import Source + + +class SAPBW(Source): + """ + A class for quering the SAP BW (SAP Business Warehouse) source using pyrfc library. + Documentation to pyrfc can be found under: https://sap.github.io/PyRFC/pyrfc.html + Documentation for SAP connection modules under: https://www.se80.co.uk/sap-function-modules/list/?index=rsr_mdx + + """ + + def __init__(self, credentials: dict, *args, **kwargs): + """ + Create an instance of the SAPBW class. + + Args: + credentials (dict): Credentials to connect with SAP BW containing ashost, sysnr, user, passwd, client. + + Raises: + CredentialError: If provided credentials are incorrect. + """ + if credentials is None: + raise CredentialError("Missing credentials.") + + super().__init__(*args, credentials=credentials, **kwargs) + + def get_connection(self) -> Connection: + """ + Function to create the connection with SAP BW. + + Returns: + Connection: Connection to SAP. + """ + return Connection( + ashost=self.credentials.get("ashost"), + sysnr=self.credentials.get("sysnr"), + user=self.credentials.get("user"), + passwd=self.credentials.get("passwd"), + client=self.credentials.get("client"), + ) + + def get_all_available_columns(self, mdx_query: str) -> List: + """ + Function to generate list of all available columns in the SAP table based on passed MDX query. + + Args: + mdx_query (str): The MDX query to be passed to connection. + + Returns: + all_available_columns: List of all available columns in the source table. + """ + + conn = self.get_connection() + query = textwrap.wrap( + mdx_query, width=75 + ) # width = 75, to properly split mdx query into substrings passed to SAP object creation function + + properties = conn.call("RSR_MDX_CREATE_STORED_OBJECT", COMMAND_TEXT=query) + datasetid = properties["DATASETID"] + + if properties["RETURN"]["MESSAGE"] == "": + get_axis_info = conn.call( + "RSR_MDX_GET_AXIS_INFO", DATASETID=datasetid + ) # listing all of available columns and metrics + cols = get_axis_info["AXIS_DIMENSIONS"] + + all_available_columns = [x["DIM_UNAM"] for x in cols] + else: + all_available_columns = [] + self.logger.error(properties["RETURN"]["MESSAGE"]) + + return all_available_columns + + def get_output_data(self, mdx_query: str) -> dict: + """ + Function to generate the SAP output dataset from MDX query. + + Args: + mdx_query (str): The MDX query to be passed to connection. + + Returns: + query_output: SAP output dictionary in JSON format that contains data rows and column headers. + + Example output: + + { + "RETURN": { + "TYPE": "", + "ID": "", + "NUMBER": "000", + "MESSAGE": "",... + },... + "DATA": [ + { + "COLUMN": 0, + "ROW": 0, + "DATA": "VELUX Deutschland GmbH", + "VALUE_DATA_TYPE": "CHAR", + "CELL_STATUS": "" + },... + ], + "HEADER": [ + { + "COLUMN": 0, + "ROW": 0, + "DATA": "[0COMP_CODE].[LEVEL01].[DESCRIPTION]", + "VALUE_DATA_TYPE": "CHAR", + "CELL_STATUS": "" + },... + ] + } + + """ + conn = self.get_connection() + query = textwrap.wrap( + mdx_query, 75 + ) # width = 75, to properly split mdx query into substrings passed to SAP object creation function + properties = conn.call("RSR_MDX_CREATE_OBJECT", COMMAND_TEXT=query) + + datasetid = properties["DATASETID"] + query_output = conn.call("RSR_MDX_GET_FLAT_DATA", DATASETID=datasetid) + + return query_output diff --git a/viadot/sources/vid_club.py b/viadot/sources/vid_club.py index 207691fe1..e7819577a 100644 --- a/viadot/sources/vid_club.py +++ b/viadot/sources/vid_club.py @@ -1,16 +1,18 @@ import json import os import urllib -from datetime import datetime -from typing import Any, Dict, List, Literal +from datetime import date, datetime, timedelta +from typing import Any, Dict, List, Literal, Tuple import pandas as pd +from prefect.utilities import logging -from ..config import local_config from ..exceptions import CredentialError, ValidationError from ..utils import handle_api_response from .base import Source +logger = logging.get_logger() + class VidClub(Source): """ @@ -18,26 +20,18 @@ class VidClub(Source): Documentation for this API is located at: https://evps01.envoo.net/vipapi/ There are 4 endpoints where to get the data. - """ - def __init__(self, *args, credentials: Dict[str, Any] = None, **kwargs): + def __init__(self, credentials: Dict[str, Any], *args, **kwargs): """ Create an instance of VidClub. Args: - credentials (dict, optional): Credentials to Vid Club APIs. - Defaults to dictionary. + credentials (Dict[str, Any]): Credentials to Vid Club APIs containing token. Raises: - CredentialError: If credentials are not provided in local_config or directly as a parameter. + CredentialError: If credentials are not provided as a parameter. """ - - DEFAULT_CREDENTIALS = local_config.get("VIDCLUB") - credentials = kwargs.pop("credentials", DEFAULT_CREDENTIALS) - if credentials is None: - raise CredentialError("Missing credentials.") - self.headers = { "Authorization": "Bearer " + credentials["token"], "Content-Type": "application/json", @@ -47,22 +41,23 @@ def __init__(self, *args, credentials: Dict[str, Any] = None, **kwargs): def build_query( self, - source: Literal["jobs", "product", "company", "survey"], from_date: str, to_date: str, api_url: str, items_per_page: int, + source: Literal["jobs", "product", "company", "survey"] = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all", ) -> str: """ Builds the query from the inputs. Args: - source (str): The endpoint source to be accessed, has to be among these: - ['jobs', 'product', 'company', 'survey']. from_date (str): Start date for the query. - to_date (str): End date for the query, if empty, datetime.today() will be used. - api_url (str): Generic part of the URL. + to_date (str): End date for the query, if empty, will be executed as datetime.today().strftime("%Y-%m-%d"). + api_url (str): Generic part of the URL to Vid Club API. items_per_page (int): number of entries per page. + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API] Returns: str: Final query with all filters added. @@ -71,8 +66,8 @@ def build_query( ValidationError: If any source different than the ones in the list are used. """ if source in ["jobs", "product", "company"]: - url = f"{api_url}{source}?from={from_date}&to={to_date}&limit={items_per_page}" - elif source in "survey": + url = f"{api_url}{source}?from={from_date}&to={to_date}®ion={region}&limit={items_per_page}" + elif source == "survey": url = f"{api_url}{source}?language=en&type=question" else: raise ValidationError( @@ -80,70 +75,146 @@ def build_query( ) return url + def intervals( + self, from_date: str, to_date: str, days_interval: int + ) -> Tuple[List[str], List[str]]: + """ + Breaks dates range into smaller by provided days interval. + + Args: + from_date (str): Start date for the query in "%Y-%m-%d" format. + to_date (str): End date for the query, if empty, will be executed as datetime.today().strftime("%Y-%m-%d"). + days_interval (int): Days specified in date range per api call (test showed that 30-40 is optimal for performance). + + Returns: + List[str], List[str]: Starts and Ends lists that contains information about date ranges for specific period and time interval. + + Raises: + ValidationError: If the final date of the query is before the start date. + """ + + if to_date == None: + to_date = datetime.today().strftime("%Y-%m-%d") + + end_date = datetime.strptime(to_date, "%Y-%m-%d").date() + start_date = datetime.strptime(from_date, "%Y-%m-%d").date() + + from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") + + to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") + delta = to_date_obj - from_date_obj + + if delta.days < 0: + raise ValidationError("to_date cannot be earlier than from_date.") + + interval = timedelta(days=days_interval) + starts = [] + ends = [] + + period_start = start_date + while period_start < end_date: + period_end = min(period_start + interval, end_date) + starts.append(period_start.strftime("%Y-%m-%d")) + ends.append(period_end.strftime("%Y-%m-%d")) + period_start = period_end + if len(starts) == 0 and len(ends) == 0: + starts.append(from_date) + ends.append(to_date) + return starts, ends + + def check_connection( + self, + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all", + url: str = None, + ) -> Tuple[Dict[str, Any], str]: + """ + Initiate first connection to API to retrieve piece of data with information about type of pagination in API URL. + This option is added because type of pagination for endpoints is being changed in the future from page number to 'next' id. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. 100 entries by default. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API] + url (str, optional): Generic part of the URL to Vid Club API. Defaults to None. + + Returns: + Tuple[Dict[str, Any], str]: Dictionary with first response from API with JSON containing data and used URL string. + + Raises: + ValidationError: If from_date is earlier than 2022-03-22. + ValidationError: If to_date is earlier than from_date. + """ + + if from_date < "2022-03-22": + raise ValidationError("from_date cannot be earlier than 2022-03-22.") + + if to_date < from_date: + raise ValidationError("to_date cannot be earlier than from_date.") + + if url is None: + url = self.credentials["url"] + + first_url = self.build_query( + source=source, + from_date=from_date, + to_date=to_date, + api_url=url, + items_per_page=items_per_page, + region=region, + ) + headers = self.headers + response = handle_api_response( + url=first_url, headers=headers, method="GET", verify=False + ) + response = response.json() + + return (response, first_url) + def get_response( self, - source: Literal["jobs", "product", "company", "survey"], + source: Literal["jobs", "product", "company", "survey"] = None, from_date: str = "2022-03-22", to_date: str = None, items_per_page: int = 100, - region="null", + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all", ) -> pd.DataFrame: """ - Gets the response from the API queried and transforms it into DataFrame. + Basing on the pagination type retrieved using check_connection function, gets the response from the API queried and transforms it into DataFrame. Args: - source (str): The endpoint source to be accessed, has to be among these: - ['jobs', 'product', 'company', 'survey']. + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. - to_date (str, optional): End date for the query. By default, datetime.today() will be used. + to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. items_per_page (int, optional): Number of entries per page. 100 entries by default. - region (str, optinal): Region filter for the query. By default, it is empty. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API] Returns: pd.DataFrame: Table of the data carried in the response. Raises: ValidationError: If any source different than the ones in the list are used. - ValidationError: If the initial date of the query is before the oldest date in the data (2023-03-22). - ValidationError: If the final date of the query is before the start date. """ - - # Dealing with bad arguments + headers = self.headers if source not in ["jobs", "product", "company", "survey"]: raise ValidationError( "The source has to be: jobs, product, company or survey" ) if to_date == None: - datetime.today().strftime("%Y-%m-%d") + to_date = datetime.today().strftime("%Y-%m-%d") - from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") - oldest_date_obj = datetime.strptime("2022-03-22", "%Y-%m-%d") - delta = from_date_obj - oldest_date_obj - - if delta.days < 0: - raise ValidationError("from_date cannot be earlier than 2023-03-22!!") - - to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") - delta = to_date_obj - from_date_obj - - if delta.days < 0: - raise ValidationError("to_date cannot be earlier than from_date!") - - # Preparing the Query - first_url = self.build_query( - source, - from_date, - to_date, - self.credentials["url"], + response, first_url = self.check_connection( + source=source, + from_date=from_date, + to_date=to_date, items_per_page=items_per_page, + region=region, ) - headers = self.headers - - # Getting first page - response = handle_api_response(url=first_url, headers=headers, method="GET") - - # Next Pages - response = response.json() if isinstance(response, dict): keys_list = list(response.keys()) @@ -152,24 +223,93 @@ def get_response( else: keys_list = [] + if "next" in keys_list: + ind = True + else: + ind = False + if "data" in keys_list: - # first page content df = pd.DataFrame(response["data"]) length = df.shape[0] - page = 2 + page = 1 while length == items_per_page: - url = f"{first_url}&page={page}" - r = handle_api_response(url=url, headers=headers, method="GET") + if ind == True: + next = response["next"] + url = f"{first_url}&next={next}" + else: + page += 1 + url = f"{first_url}&page={page}" + r = handle_api_response( + url=url, headers=headers, method="GET", verify=False + ) response = r.json() df_page = pd.DataFrame(response["data"]) if source == "product": df_page = df_page.transpose() length = df_page.shape[0] df = pd.concat((df, df_page), axis=0) - page += 1 - else: df = pd.DataFrame(response) return df + + def total_load( + self, + source: Literal["jobs", "product", "company", "survey"] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = "all", + days_interval: int = 30, + ) -> pd.DataFrame: + """ + Looping get_response and iterating by date ranges defined in intervals. Stores outputs as DataFrames in a list. + At the end, daframes are concatenated in one and dropped duplicates that would appear when quering. + + Args: + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. 100 entries by default. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to "all". [July 2023 status: parameter works only for 'all' on API] + days_interval (int, optional): Days specified in date range per api call (test showed that 30-40 is optimal for performance). Defaults to 30. + + Returns: + pd.DataFrame: Dataframe of the concatanated data carried in the responses. + """ + + starts, ends = self.intervals( + from_date=from_date, to_date=to_date, days_interval=days_interval + ) + + dfs_list = [] + if len(starts) > 0 and len(ends) > 0: + for start, end in zip(starts, ends): + logger.info(f"ingesting data for dates [{start}]-[{end}]...") + df = self.get_response( + source=source, + from_date=start, + to_date=end, + items_per_page=items_per_page, + region=region, + ) + dfs_list.append(df) + if len(dfs_list) > 1: + df = pd.concat(dfs_list, axis=0, ignore_index=True) + else: + df = pd.DataFrame(dfs_list[0]) + else: + df = self.get_response( + source=source, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + ) + df.drop_duplicates(inplace=True) + + if df.empty: + logger.error("No data for this date range") + + return df diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index ad9cefa5d..4aded8cbe 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -40,6 +40,11 @@ except ImportError: pass +try: + from .sap_bw import SAPBWToDF +except ImportError: + pass + from .business_core import BusinessCoreToParquet from .customer_gauge import CustomerGaugeToDF from .duckdb import DuckDBCreateTableFromParquet, DuckDBQuery, DuckDBToDF diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index 7b26deb15..403ed7081 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -1,7 +1,6 @@ import os -import sys import time -from typing import Any, Dict, List, Literal +from typing import Any, Dict, List import numpy as np import pandas as pd @@ -27,11 +26,12 @@ def __init__( start_date: str = None, end_date: str = None, environment: str = None, - schedule_id: str = None, report_url: str = None, report_columns: List[str] = None, local_file_path: str = "", sep: str = "\t", + conversationId_list: List[str] = None, + key_list: List[str] = None, credentials_genesys: Dict[str, Any] = None, timeout: int = 3600, *args: List[Any], @@ -48,17 +48,17 @@ def __init__( end_date (str, optional): End date of the report. Defaults to None. environment (str, optional): Adress of host server. Defaults to None than will be used enviroment from credentials. - schedule_id (str, optional): The ID of report. Defaults to None. report_url (str, optional): The url of report generated in json response. Defaults to None. report_columns (List[str], optional): List of exisiting column in report. Defaults to None. local_file_path (str, optional): The local path from which to upload the file(s). Defaults to "". sep (str, optional): Separator in csv file. Defaults to "\t". + conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. + key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ self.logger = prefect.context.get("logger") - self.schedule_id = schedule_id self.report_name = report_name self.view_type = view_type self.environment = environment @@ -70,6 +70,8 @@ def __init__( self.credentials_genesys = credentials_genesys self.local_file_path = local_file_path self.sep = sep + self.conversationId_list = conversationId_list + self.key_list = key_list super().__init__( name=self.report_name, @@ -283,13 +285,14 @@ def merge_conversations_dfs(self, data_to_merge: list) -> DataFrame: "report_name", "view_type", "environment", - "schedule_id", "report_url", "post_data_list", "start_date", "end_date", "report_columns", "credentials_genesys", + "conversationId_list", + "key_list", ) def run( self, @@ -297,13 +300,14 @@ def run( view_type: str = None, view_type_time_sleep: int = 80, environment: str = None, - schedule_id: str = None, report_url: str = None, post_data_list: List[str] = None, - end_point: str = "reporting/exports", + end_point: str = "analytics/reporting/exports", start_date: str = None, end_date: str = None, report_columns: List[str] = None, + conversationId_list: List[str] = None, + key_list: List[str] = None, credentials_genesys: Dict[str, Any] = None, ) -> List[str]: """ @@ -314,15 +318,15 @@ def run( view_type (str, optional): The type of view export job to be created. Defaults to None. view_type_time_sleep (int, optional): Waiting time to retrieve data from Genesys API. Defaults to 80. post_data_list (List[str], optional): List of string templates to generate json body. Defaults to None. - end_point (str, optional): Final end point for Genesys connection. Defaults to "reporting/exports". + end_point (str, optional): Final end point for Genesys connection. Defaults to "analytics/reporting/exports". credentials_genesys (Dict[str, Any], optional): Credentials to connect with Genesys API containing CLIENT_ID. Defaults to None. start_date (str, optional): Start date of the report. Defaults to None. end_date (str, optional): End date of the report. Defaults to None. - environment (str, optional): Adress of host server. Defaults to None than will be used enviroment - from credentials. - schedule_id (str, optional): The ID of report. Defaults to None. + environment (str, optional): Adress of host server. Defaults to None than will be used enviroment from credentials. report_url (str, optional): The url of report generated in json response. Defaults to None. report_columns (List[str], optional): List of exisiting column in report. Defaults to None. + conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. + key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. Returns: List[str]: List of file names. @@ -335,13 +339,12 @@ def run( start_date=start_date, end_date=end_date, environment=environment, - schedule_id=schedule_id, report_url=report_url, report_columns=report_columns, ) if view_type == "queue_performance_detail_view": - genesys.genesys_generate_exports( + genesys.genesys_api_connection( post_data_list=post_data_list, end_point=end_point ) @@ -356,7 +359,6 @@ def run( timeout = timeout_start + 60 # while loop with timeout while time.time() < timeout: - try: genesys.get_reporting_exports_data() urls = [col for col in np.array(genesys.report_data).T][1] @@ -375,7 +377,7 @@ def run( "agent_status_summary_view", "agent_status_detail_view", ]: - genesys.genesys_generate_exports( + genesys.genesys_api_connection( post_data_list=post_data_list, end_point=end_point ) logger.info( @@ -385,7 +387,7 @@ def run( genesys.get_reporting_exports_data() - if view_type is not None and end_point == "reporting/exports": + if view_type is not None and end_point == "analytics/reporting/exports": failed = [col for col in np.array(genesys.report_data).T][-1] if "FAILED" in failed and "COMPLETED" in failed: @@ -412,7 +414,7 @@ def run( return file_names - elif view_type is None and end_point == "conversations/details/query": + elif view_type is None and end_point == "analytics/conversations/details/query": if len(post_data_list) > 1: logger.error("Not available more than one body for this end-point.") raise signals.FAIL(message="Stopping the flow.") @@ -421,7 +423,7 @@ def run( page_counter = post_data_list[0]["paging"]["pageNumber"] merged_data = {} while not stop_loop: - report = genesys.genesys_generate_exports( + report = genesys.genesys_api_connection( post_data_list=post_data_list, end_point=end_point ) merged_data_frame = self.merge_conversations_dfs( @@ -458,3 +460,38 @@ def run( logger.info("Downloaded the data from the Genesys into the CSV.") return [file_name] + + elif view_type is None and end_point == "conversations": + data_list = [] + + for id in conversationId_list: + json_file = genesys.genesys_api_connection( + post_data_list=post_data_list, + end_point=f"{end_point}/{id}", + method="GET", + ) + logger.info(f"Generated webmsg_response for {id}") + + attributes = json_file["participants"][0]["attributes"] + temp_dict = { + key: value for (key, value) in attributes.items() if key in key_list + } + temp_dict["conversationId"] = json_file["id"] + data_list.append(temp_dict) + + df = pd.DataFrame(data_list) + df = df[df.columns[-1:]].join(df[df.columns[:-1]]) + + start = start_date.replace("-", "") + end = end_date.replace("-", "") + + file_name = f"WEBMESSAGE_{start}-{end}.csv" + df.to_csv( + os.path.join(file_name), + index=False, + sep="\t", + ) + + logger.info("Downloaded the data from the Genesys into the CSV.") + + return [file_name] diff --git a/viadot/tasks/sap_bw.py b/viadot/tasks/sap_bw.py new file mode 100644 index 000000000..4d34f9960 --- /dev/null +++ b/viadot/tasks/sap_bw.py @@ -0,0 +1,122 @@ +import pandas as pd +from prefect import Task +from prefect.tasks.secrets import PrefectSecret +from prefect.utilities import logging + +from viadot.exceptions import ValidationError +from viadot.sources import SAPBW +from viadot.task_utils import * + +logger = logging.get_logger() + + +class SAPBWToDF(Task): + def __init__( + self, + sapbw_credentials: dict = None, + sapbw_credentials_key: str = "SAP", + env: str = "BW", + *args, + **kwargs, + ): + """ + A task for quering the SAP BW (SAP Business Warehouse) source using pyrfc library. + + Args: + sapbw_credentials (dict, optional): Credentials to SAP BW server. Defaults to None. + sapbw_credentials_key (str, optional): Azure KV secret. Defaults to "SAP". + env (str, optional): SAP environment. Defaults to "BW". + """ + if sapbw_credentials is None: + self.sapbw_credentials = credentials_loader.run( + credentials_secret=sapbw_credentials_key + ).get(env) + + else: + self.sapbw_credentials = sapbw_credentials + + super().__init__( + name="sapbw_to_df", + *args, + **kwargs, + ) + + def __call__(self): + """Download SAP BW data to a DF""" + super().__call__(self) + + def apply_user_mapping( + self, df: pd.DataFrame, mapping_dict: dict = {} + ) -> pd.DataFrame: + """ + Function to apply the column mapping defined by user for the output dataframe. + DataFrame will be cut to selected columns - if any other columns need to be included in the output file, + please add them to the mapping dictionary with original names. + + Args: + df (pd.DataFrame): Input dataframe for the column mapping task. + mapping_dict (dict, optional): Dictionary with original and new column names. Defaults to {}. + + Returns: + pd.DataFrame: Output DataFrame with mapped columns. + """ + self.logger.info("Applying user defined mapping for columns...") + df = df[mapping_dict.keys()] + df.columns = mapping_dict.values() + + self.logger.info(f"Successfully applied user mapping.") + + return df + + def to_df(self, query_output: dict) -> pd.DataFrame: + """ + Function to convert the SAP output in JSON format into a dataframe. + + Args: + query_output (dict): SAP output dictionary in JSON format that contains data rows and column headers. + + Raises: + ValidationError: Prints the original SAP error message in case of issues with MDX execution. + + Returns: + pd.DataFrame: Output dataframe. + """ + raw_data = {} + + if query_output["RETURN"]["MESSAGE"] == "": + results = query_output["DATA"] + for cell in results: + if cell["ROW"] not in raw_data: + raw_data[cell["ROW"]] = {} + if "].[" not in cell["DATA"]: + raw_data[cell["ROW"]][cell["COLUMN"]] = cell["DATA"] + rows = [raw_data[row] for row in raw_data] + cols = [x["DATA"] for x in query_output["HEADER"]] + df = pd.DataFrame(data=rows) + df.columns = cols + else: + df = pd.DataFrame() + raise ValidationError(query_output["RETURN"]["MESSAGE"]) + + return df + + def run(self, mdx_query: str, mapping_dict: dict = {}) -> pd.DataFrame: + """ + Task run method. + + Args: + mdx_query (str): MDX query to be passed to SAP BW. + mapping_dict (dict, optional): Mapping dictionary from user in json format. Defaults to {}. + + Returns: + pd.DataFrame: Output DataFrame with applied column mapping. + """ + sap = SAPBW(credentials=self.sapbw_credentials) + + data = sap.get_output_data(mdx_query) + df = self.to_df(data) + + if mapping_dict: + df = self.apply_user_mapping(df, mapping_dict) + + return df diff --git a/viadot/tasks/vid_club.py b/viadot/tasks/vid_club.py index 448df8e41..0814a306f 100644 --- a/viadot/tasks/vid_club.py +++ b/viadot/tasks/vid_club.py @@ -5,10 +5,12 @@ from typing import Any, Dict, List, Literal import pandas as pd -import prefect from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities import logging +from prefect.utilities.tasks import defaults_from_attrs + +from viadot.task_utils import * from ..sources import VidClub @@ -18,12 +20,12 @@ class VidClubToDF(Task): def __init__( self, - source: Literal["jobs", "product", "company", "survey"], + source: Literal["jobs", "product", "company", "survey"] = None, credentials: Dict[str, Any] = None, + credentials_secret: str = "VIDCLUB", + vault_name: str = None, from_date: str = "2022-03-22", - to_date: str = "", - if_empty: str = "warn", - retry_delay: timedelta = timedelta(seconds=10), + to_date: str = None, timeout: int = 3600, report_name: str = "vid_club_to_df", *args: List[Any], @@ -33,27 +35,31 @@ def __init__( Task to downloading data from Vid Club APIs to Pandas DataFrame. Args: - source (str): The endpoint source to be accessed, has to be among these: - ['jobs', 'product', 'company', 'survey']. + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None. + credentials_secret (str, optional): The name of the secret in Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". + vault_name (str, optional): For credentials stored in Azure Key Vault. The name of the vault from which to obtain the secret. Defaults to None. from_date (str): Start date for the query, by default is the oldest date in the data, '2022-03-22'. - to_date (str): End date for the query, if empty, datetime.today() will be used. - if_empty (str, optional): What to do if query returns no data. Defaults to "warn". - retry_delay (timedelta, optional): The delay between task retries. Defaults to 10 seconds. + to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. timeout (int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. report_name (str, optional): Stores the report name. Defaults to "vid_club_to_df". Returns: Pandas DataFrame """ - self.logger = prefect.context.get("logger") self.source = source - self.credentials = credentials self.from_date = from_date self.to_date = to_date - self.if_empty = if_empty - self.retry_delay = retry_delay self.report_name = report_name + self.credentials_secret = credentials_secret + self.vault_name = vault_name + + if credentials is None: + self.credentials = credentials_loader.run( + credentials_secret=credentials_secret, vault_name=vault_name + ) + else: + self.credentials = credentials super().__init__( name=self.report_name, @@ -66,18 +72,67 @@ def __call__(self, *args, **kwargs): """Download Vid Club data to Pandas DataFrame""" return super().__call__(*args, **kwargs) - def run(self) -> pd.DataFrame: + @defaults_from_attrs( + "source", + "credentials", + "from_date", + "to_date", + ) + def run( + self, + source: Literal["jobs", "product", "company", "survey"] = None, + credentials: Dict[str, Any] = None, + from_date: str = "2022-03-22", + to_date: str = None, + items_per_page: int = 100, + region: str = "all", + days_interval: int = 30, + cols_to_drop: List[str] = None, + ) -> pd.DataFrame: """ Task run method. + Args: + source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. + credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest date in the data, '2022-03-22'. + to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. 100 entries by default. + region (str, optional): Region filter for the query. Valid inputs: ["bg", "hu", "hr", "pl", "ro", "si", "all"]. Defaults to "all". + days_interval (int, optional): Days specified in date range per api call (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. + + Raises: + KeyError: When DataFrame doesn't contain columns provided in the list of columns to drop. + TypeError: When cols_to_drop is not a list type. + Returns: pd.DataFrame: The query result as a pandas DataFrame. """ - vc_obj = VidClub() + vc_obj = VidClub(credentials=credentials) - vc_dataframe = vc_obj.get_response( - source=self.source, from_date=self.from_date, to_date=self.to_date + vc_dataframe = vc_obj.total_load( + source=source, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + days_interval=days_interval, ) + if cols_to_drop is not None: + if isinstance(cols_to_drop, list): + try: + logger.info(f"Dropping following columns: {cols_to_drop}...") + vc_dataframe.drop( + columns=cols_to_drop, inplace=True, errors="raise" + ) + except KeyError as ke: + logger.error( + f"Column(s): {cols_to_drop} don't exist in the DataFrame. No columns were dropped. Returning full DataFrame..." + ) + logger.info(f"Existing columns: {vc_dataframe.columns}") + else: + raise TypeError("Provide columns to drop in a List.") return vc_dataframe