Skip to content

Commit

Permalink
[AIRFLOW-5615] Reduce duplicated logic around job heartbeating (apach…
Browse files Browse the repository at this point in the history
…e#6311)

Both SchedulerJob and LocalTaskJob have their own timers and decide when
to call heartbeat based upon that. This makes those functions harder to
follow, (and the logs more confusing) so I've moved the logic to BaseJob
  • Loading branch information
ashb authored May 27, 2020
1 parent 30b12a9 commit 8ac90b0
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 13 deletions.
15 changes: 12 additions & 3 deletions airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def heartbeat_callback(self, session=None):
Callback that is called during heartbeat. This method should be overwritten.
"""

def heartbeat(self):
def heartbeat(self, only_if_necessary: bool = False):
"""
Heartbeats update the job's entry in the database with a timestamp
for the latest_heartbeat and allows for the job to be killed
Expand All @@ -165,7 +165,18 @@ def heartbeat(self):
will sleep 50 seconds to complete the 60 seconds and keep a steady
heart rate. If you go over 60 seconds before calling it, it won't
sleep at all.
:param only_if_necessary: If the heartbeat is not yet due then do
nothing (don't update column, don't call ``heartbeat_callback``)
:type only_if_necessary: boolean
"""
seconds_remaining = 0
if self.latest_heartbeat:
seconds_remaining = self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()

if seconds_remaining > 0 and only_if_necessary:
return

previous_heartbeat = self.latest_heartbeat

try:
Expand Down Expand Up @@ -215,9 +226,7 @@ def run(self):
self.state = State.RUNNING
session.add(self)
session.commit()
id_ = self.id
make_transient(self)
self.id = id_

try:
self._execute()
Expand Down
10 changes: 1 addition & 9 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1615,9 +1615,6 @@ def _run_scheduler_loop(self):
:rtype: None
"""
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = timezone.utcnow()

is_unit_test = conf.getboolean('core', 'unit_test_mode')

# For the execute duration, parse and schedule DAGs
Expand All @@ -1642,12 +1639,7 @@ def _run_scheduler_loop(self):
continue

# Heartbeat the scheduler periodically
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
self.log.debug("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = timezone.utcnow()
self.heartbeat(only_if_necessary=True)

self._emit_pool_metrics()

Expand Down
38 changes: 38 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import subprocess
import sys
from contextlib import ExitStack
from datetime import datetime, timedelta

import freezegun
import pytest

# We should set these before loading _any_ of the rest of airflow so that the
Expand Down Expand Up @@ -415,3 +417,39 @@ def pytest_runtest_setup(item):
skip_quarantined_test(item)
skip_if_credential_file_missing(item)
skip_if_airflow_2_test(item)


@pytest.fixture
def frozen_sleep(monkeypatch):
"""
Use freezegun to "stub" sleep, so that it takes no time, but that
``datetime.now()`` appears to move forwards
If your module under test does ``import time`` and then ``time.sleep``::
def test_something(frozen_sleep):
my_mod.fn_under_test()
If your module under test does ``from time import sleep`` then you will
have to mock that sleep function directly::
def test_something(frozen_sleep, monkeypatch):
monkeypatch.setattr('my_mod.sleep', frozen_sleep)
my_mod.fn_under_test()
"""
freezegun_control = None

def fake_sleep(seconds):
nonlocal freezegun_control
utcnow = datetime.utcnow()
if freezegun_control is not None:
freezegun_control.stop()
freezegun_control = freezegun.freeze_time(utcnow + timedelta(seconds=seconds))
freezegun_control.start()

monkeypatch.setattr("time.sleep", fake_sleep)
yield fake_sleep

if freezegun_control is not None:
freezegun_control.stop()
21 changes: 20 additions & 1 deletion tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import datetime

from mock import Mock, patch
from mock import ANY, Mock, patch
from pytest import raises
from sqlalchemy.exc import OperationalError

Expand Down Expand Up @@ -138,3 +138,22 @@ def test_essential_attr(self, mock_getuser, mock_hostname, mock_default_executor
assert test_job.unixname == "testuser"
assert test_job.state == "running"
assert test_job.executor == mock_sequential_executor

def test_heartbeat(self, frozen_sleep, monkeypatch):
monkeypatch.setattr('airflow.jobs.base_job.sleep', frozen_sleep)
with create_session() as session:
job = MockJob(None, heartrate=10)
job.latest_heartbeat = timezone.utcnow()
session.add(job)
session.commit()

hb_callback = Mock()
job.heartbeat_callback = hb_callback

job.heartbeat()

hb_callback.assert_called_once_with(session=ANY)

hb_callback.reset_mock()
job.heartbeat(only_if_necessary=True)
assert hb_callback.called is False

0 comments on commit 8ac90b0

Please sign in to comment.