Skip to content

Commit

Permalink
Fix a bug where scheduler heartrate parameter were not used (#37992)
Browse files Browse the repository at this point in the history
Sinc #30255 scheduler heartrate has not been properly calculated.
We missed the check for SchedulerJob type and setting heartrate
value from `scheduler_health_check_threshold`.

This PR fixes it.

Fix: #37971
  • Loading branch information
potiuk authored Mar 8, 2024
1 parent ea5238a commit 01e40ab
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
2 changes: 2 additions & 0 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ def _merge_from(self, job: Job | JobPydantic | None):
def _heartrate(job_type: str) -> float:
if job_type == "TriggererJob":
return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
elif job_type == "SchedulerJob":
return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC")
else:
# Heartrate used to be hardcoded to scheduler, so in all other
# cases continue to use that value for back compat
Expand Down
1 change: 0 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"""

job_type = "SchedulerJob"
heartrate: int = conf.getint("scheduler", "SCHEDULER_HEARTBEAT_SEC")

def __init__(
self,
Expand Down
9 changes: 6 additions & 3 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ def abort():
)
def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, job_heartbeat_sec):
"""Ensure heartrate is set correctly after jobs are queried from the DB"""
with create_session() as session, conf_vars(
{(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}
):
if job_type == "scheduler":
config_name = "scheduler_heartbeat_sec"
else:
config_name = "job_heartbeat_sec"

with create_session() as session, conf_vars({(job_type.lower(), config_name): job_heartbeat_sec}):
job = Job()
job_runner(job=job)
session.add(job)
Expand Down
10 changes: 10 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ def test_is_alive(self, configs):
not scheduler_job.is_alive()
), "Completed jobs even with recent heartbeat should not be alive"

@pytest.mark.parametrize(
"heartrate",
[10, 5],
)
def test_heartrate(self, heartrate):
with conf_vars({("scheduler", "scheduler_heartbeat_sec"): str(heartrate)}):
scheduler_job = Job(executor=self.null_exec)
_ = SchedulerJobRunner(job=scheduler_job)
assert scheduler_job.heartrate == heartrate

def run_single_scheduler_loop_with_no_dags(self, dags_folder):
"""
Utility function that runs a single scheduler loop without actually
Expand Down

0 comments on commit 01e40ab

Please sign in to comment.