diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index b215b3435ba1c..b390d0414cd1c 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -265,6 +265,8 @@ def _merge_from(self, job: Job | JobPydantic | None): def _heartrate(job_type: str) -> float: if job_type == "TriggererJob": return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") + elif job_type == "SchedulerJob": + return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC") else: # Heartrate used to be hardcoded to scheduler, so in all other # cases continue to use that value for back compat diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 32cc9f5a634ab..9a5ba78b6f65f 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -150,7 +150,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): """ job_type = "SchedulerJob" - heartrate: int = conf.getint("scheduler", "SCHEDULER_HEARTBEAT_SEC") def __init__( self, diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index 8f7237ffc6871..62e0369791b60 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -97,9 +97,12 @@ def abort(): ) def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, job_heartbeat_sec): """Ensure heartrate is set correctly after jobs are queried from the DB""" - with create_session() as session, conf_vars( - {(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec} - ): + if job_type == "scheduler": + config_name = "scheduler_heartbeat_sec" + else: + config_name = "job_heartbeat_sec" + + with create_session() as session, conf_vars({(job_type.lower(), config_name): job_heartbeat_sec}): job = Job() job_runner(job=job) session.add(job) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 08d994d50c828..2c57ca2a9f362 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -193,6 +193,16 @@ def test_is_alive(self, configs): not scheduler_job.is_alive() ), "Completed jobs even with recent heartbeat should not be alive" + @pytest.mark.parametrize( + "heartrate", + [10, 5], + ) + def test_heartrate(self, heartrate): + with conf_vars({("scheduler", "scheduler_heartbeat_sec"): str(heartrate)}): + scheduler_job = Job(executor=self.null_exec) + _ = SchedulerJobRunner(job=scheduler_job) + assert scheduler_job.heartrate == heartrate + def run_single_scheduler_loop_with_no_dags(self, dags_folder): """ Utility function that runs a single scheduler loop without actually