Skip to content

Commit

Permalink
log timing for report execution
Browse files Browse the repository at this point in the history
  • Loading branch information
eschutho committed Oct 26, 2024
1 parent 299cea0 commit d78a1c9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 14 deletions.
8 changes: 7 additions & 1 deletion superset/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,18 @@ def execute(self: Celery.task, report_schedule_id: int) -> None:
task_id = None
try:
task_id = execute.request.id
scheduled_dttm = execute.request.eta
scheduled_dttm = datetime.strptime(execute.request.eta, "%Y-%m-%d %H:%M:%S")

logger.info(
"Executing alert/report, task id: %s, scheduled_dttm: %s",
task_id,
scheduled_dttm,
)
current_time = datetime.now(tz=timezone.utc)
stats_logger.timing(
"reporting.time_to_execution",
(current_time - scheduled_dttm).total_seconds() * 1000,
)
AsyncExecuteReportScheduleCommand(
task_id,
report_schedule_id,
Expand Down
66 changes: 53 additions & 13 deletions tests/integration_tests/reports/scheduler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timezone
from random import randint
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from flask import current_app
from flask_appbuilder.security.sqla.models import User
from freezegun import freeze_time
from freezegun.api import FakeDatetime
Expand Down Expand Up @@ -152,7 +154,10 @@ def test_scheduler_feature_flag_off(execute_mock, is_feature_enabled, owners):
@patch("superset.commands.report.execute.AsyncExecuteReportScheduleCommand.__init__")
@patch("superset.commands.report.execute.AsyncExecuteReportScheduleCommand.run")
@patch("superset.tasks.scheduler.execute.update_state")
def test_execute_task(update_state_mock, command_mock, init_mock, owners):
@patch("celery.app.task.Task.request", new_callable=MagicMock)
def test_execute_task_unexpected_error(
request_mock, update_state_mock, command_mock, init_mock, owners
):
from superset.commands.report.exceptions import ReportScheduleUnexpectedError

report_schedule = insert_report_schedule(
Expand All @@ -162,11 +167,41 @@ def test_execute_task(update_state_mock, command_mock, init_mock, owners):
timezone="America/New_York",
owners=owners,
)
request_mock.eta = "2020-01-01T08:59:00+00:00"
init_mock.return_value = None
command_mock.side_effect = ReportScheduleUnexpectedError("Unexpected error")
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id)
update_state_mock.assert_called_with(state="FAILURE")
with patch.object(current_app.config["STATS_LOGGER"], "timing") as statsd_mock:
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id)
update_state_mock.assert_called_with(state="FAILURE")
statsd_mock.assert_called_with("reporting.time_to_execution", 60000.0)

db.session.delete(report_schedule)
db.session.commit()


@pytest.mark.usefixtures("app_context")
@patch("superset.commands.report.execute.AsyncExecuteReportScheduleCommand.__init__")
@patch("superset.commands.report.execute.AsyncExecuteReportScheduleCommand.run")
@patch("superset.tasks.scheduler.execute.update_state")
@patch("celery.app.task.Task.request", new_callable=MagicMock)
def test_execute_task_success(
request_mock, update_state_mock, command_mock, init_mock, owners
):
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT,
name=f"report-{randint(0,1000)}",
crontab="0 4 * * *",
timezone="America/New_York",
owners=owners,
)
request_mock.eta = datetime(2020, 1, 1, 8, 59, 0, tzinfo=timezone.utc)
init_mock.return_value = None
with patch.object(current_app.config["STATS_LOGGER"], "timing") as statsd_mock:
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id)
update_state_mock.assert_not_called()
statsd_mock.assert_called_with("reporting.time_to_execution", 60000.0)

db.session.delete(report_schedule)
db.session.commit()
Expand All @@ -177,8 +212,9 @@ def test_execute_task(update_state_mock, command_mock, init_mock, owners):
@patch("superset.commands.report.execute.AsyncExecuteReportScheduleCommand.run")
@patch("superset.tasks.scheduler.execute.update_state")
@patch("superset.utils.log.logger")
@patch("celery.app.task.Task.request", new_callable=MagicMock)
def test_execute_task_with_command_exception(
logger_mock, update_state_mock, command_mock, init_mock, owners
request_mock, logger_mock, update_state_mock, command_mock, init_mock, owners
):
from superset.commands.exceptions import CommandException

Expand All @@ -189,15 +225,19 @@ def test_execute_task_with_command_exception(
timezone="America/New_York",
owners=owners,
)
request_mock.eta = datetime(2020, 1, 1, 8, 59, 0, tzinfo=timezone.utc)
request_mock.id = "task_id"
init_mock.return_value = None
command_mock.side_effect = CommandException("Unexpected error")
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id)
update_state_mock.assert_called_with(state="FAILURE")
logger_mock.exception.assert_called_with(
"A downstream exception occurred while generating a report: None. Unexpected error",
exc_info=True,
)
with patch.object(current_app.config["STATS_LOGGER"], "timing") as statsd_mock:
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id)
update_state_mock.assert_called_with(state="FAILURE")
logger_mock.exception.assert_called_with(
"A downstream exception occurred while generating a report: task_id. Unexpected error",
exc_info=True,
)
statsd_mock.assert_called_with("reporting.time_to_execution", 60000.0)

db.session.delete(report_schedule)
db.session.commit()

0 comments on commit d78a1c9

Please sign in to comment.