Skip to content

LITE-28603: Sending logger to main process that execute tasks #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions connect_ext_ppr/tasks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from connect_ext_ppr.constants import PPR_FILE_NAME_DELEGATION_L2, PPR_FILE_NAME_UPDATE_MARKETPLACES
from connect_ext_ppr.db import get_cbc_extension_db, get_cbc_extension_db_engine, get_db_ctx_manager
from connect_ext_ppr.models.configuration import Configuration
from connect_ext_ppr.models.enums import CBCTaskLogStatus
from connect_ext_ppr.models.enums import (
CBCTaskLogStatus,
DeploymentRequestStatusChoices,
DeploymentStatusChoices,
TasksStatusChoices,
Expand Down Expand Up @@ -390,7 +390,7 @@ def delegate_to_l2(deployment_request, cbc_service, connect_client, **kwargs):
}


def execute_tasks(db, config, tasks, connect_client): # noqa: CCR001
def execute_tasks(db, config, tasks, connect_client, logger):
was_succesfull = False
cbc_service = None

Expand All @@ -416,27 +416,28 @@ def execute_tasks(db, config, tasks, connect_client): # noqa: CCR001
db=db,
)
task.status = TasksStatusChoices.done
if not was_succesfull:
task.status = TasksStatusChoices.error

except TaskException as ex:
was_succesfull = False
task.error_message = str(ex)
task.status = TasksStatusChoices.error
except Exception as err:
task.error_message = str(ex)[:4000]
except Exception as ex:
logger.error(f'Task ID: {task.id} - {ex}')
was_succesfull = False
task.error_message = str(err)
task.status = TasksStatusChoices.error
task.error_message = 'Something went wrong.'

task.finished_at = datetime.utcnow()
db.add(task)
db.commit()

if not was_succesfull:
task.status = TasksStatusChoices.error
db.commit()
break
db.commit()

return was_succesfull


def main_process(deployment_request_id, config, connect_client):
def main_process(deployment_request_id, config, connect_client, logger):

with get_db_ctx_manager(config) as db:
deployment_request = db.query(DeploymentRequest).options(
Expand All @@ -461,7 +462,11 @@ def main_process(deployment_request_id, config, connect_client):
deployment_request_id=deployment_request_id,
).order_by(Task.id).all()

was_succesfull = execute_tasks(db, config, tasks, connect_client)
try:
was_succesfull = execute_tasks(db, config, tasks, connect_client, logger)
except Exception as ex:
was_succesfull = False
logger.error(f'DeploymentRequest ID: {deployment_request_id} - {ex}')

db.refresh(deployment_request, with_for_update=True)

Expand Down
5 changes: 3 additions & 2 deletions connect_ext_ppr/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def add_dep_request(
db, deployment_request, deployment, account_id, logger,
)

self.thread_pool.submit(main_process, instance.id, config, client)
self.thread_pool.submit(main_process, instance.id, config, client, logger)

hub = get_client_object(client, 'hubs', instance.deployment.hub_id)
response = get_deployment_request_schema(instance, hub)
Expand Down Expand Up @@ -341,11 +341,12 @@ def retry(
client: ConnectClient = Depends(get_installation_client),
installation: dict = Depends(get_installation),
config: dict = Depends(get_config),
logger: Logger = Depends(get_logger),
):
dr = get_deployment_request_by_id(depl_req_id, db, installation)
dr = DeploymentRequestActionHandler.retry(db, dr)

self.thread_pool.submit(main_process, dr.id, config, client)
self.thread_pool.submit(main_process, dr.id, config, client, logger)

hub = get_hub(client, dr.deployment.hub_id)
return get_deployment_request_schema(dr, hub)
Expand Down
27 changes: 17 additions & 10 deletions tests/test_tasks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ def test_main_process(
mock_tasks,
mocker,
product_details,
logger,
):
mock_get_product_details.return_value = product_details
dep = deployment_factory()
Expand All @@ -865,7 +866,7 @@ def test_main_process(
task_factory(deployment_request=dr, task_index='0003', type=TaskTypesChoices.delegate_to_l2)

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done

assert dbsession.query(Deployment).filter_by(status=DeploymentStatusChoices.synced).count() == 1
assert dbsession.query(DeploymentRequest).filter_by(
Expand All @@ -892,6 +893,7 @@ def test_main_process_wo_l2_delegation(
mock_tasks,
mocker,
product_details,
logger,
):
mock_get_product_details.return_value = product_details
dep = deployment_factory()
Expand All @@ -901,7 +903,7 @@ def test_main_process_wo_l2_delegation(
task_factory(deployment_request=dr, task_index='0002', type=TaskTypesChoices.apply_and_delegate)

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -931,6 +933,7 @@ def test_main_process_deployment_w_new_ppr_version(
mock_tasks,
mocker,
product_details,
logger,
):
mock_get_product_details.return_value = product_details
ppr_file = file_factory(id='MFL-123')
Expand All @@ -944,7 +947,7 @@ def test_main_process_deployment_w_new_ppr_version(
task_factory(deployment_request=dr, task_index='0003', type=TaskTypesChoices.delegate_to_l2)

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.done
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.done

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -981,7 +984,7 @@ def test_main_process_ends_w_error(
task_factory,
ppr_version_factory,
connect_client,
mock_tasks,
logger,
):
dep = deployment_factory()
ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1)
Expand All @@ -993,12 +996,13 @@ def test_main_process_ends_w_error(
my_mock = mocker.Mock()

def mock_get(key):
print(key, ' :', key != type_function_to_mock)
return lambda **kwargs: key != type_function_to_mock
my_mock.get = mock_get

mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock)
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -1031,6 +1035,7 @@ def test_main_process_wo_hub_credentials(
task_factory,
connect_client,
mocker,
logger,
):
dep = deployment_factory()
ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1)
Expand All @@ -1039,7 +1044,7 @@ def test_main_process_wo_hub_credentials(
deployment_request=dr, task_index='0001', type=TaskTypesChoices.product_setup,
)
mocker.patch('connect_ext_ppr.client.utils.get_hub_credentials', return_value=None)
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error

assert task.status == TasksStatusChoices.error
assert task.error_message == 'Hub Credentials not found for Hub ID HB-0000-0000.'
Expand Down Expand Up @@ -1079,6 +1084,7 @@ def test_main_process_w_aborted_tasks(
connect_client,
mock_tasks,
mocker,
logger,
):
"""
We only process DeploymentRequest that are in Pending status. So in this case we asume that
Expand Down Expand Up @@ -1120,7 +1126,7 @@ def change_dr_status(instance, attribute_names=None, with_for_update=None):
dbsession.refresh = change_dr_status
mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())

assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.aborted
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.aborted

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand All @@ -1147,6 +1153,7 @@ def test_main_process_w_aborted_deployment_request(
ppr_version_factory,
connect_client,
mock_tasks,
logger,
):
"""
We only process DeploymentRequest that are in Pending status. So in this case we asume that
Expand Down Expand Up @@ -1180,7 +1187,7 @@ def test_main_process_w_aborted_deployment_request(
status=TasksStatusChoices.aborted,
)

assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.aborted
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.aborted

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down Expand Up @@ -1216,7 +1223,7 @@ def test_main_process_ends_w_task_exception(
task_factory,
ppr_version_factory,
connect_client,
mock_tasks,
logger,
):
dep = deployment_factory()
ppr = ppr_version_factory(id='PPR-123', product_version=1, deployment=dep, version=1)
Expand All @@ -1236,7 +1243,7 @@ def mock_get(key):

mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock)
mocker.patch('connect_ext_ppr.tasks_manager._get_cbc_service', return_value=CBCService())
assert main_process(dr.id, {}, connect_client) == DeploymentRequestStatusChoices.error
assert main_process(dr.id, {}, connect_client, logger) == DeploymentRequestStatusChoices.error

assert dbsession.query(Deployment).filter_by(
status=DeploymentStatusChoices.pending,
Expand Down